Yep, I increase the number of tasks and the umber of nodes executing the tasks, the above issues appears. It creates the topics ( so non issue reaching the clusters ) but does not create the MirrorSourceConnector/s . The process does launch the MirrorCheckPointConnector, one for each node. I took a thread dump and the only thread I see on poll is I think for the config org.apache.kafka.connect.util.KafkaBasedLog.poll(KafkaBasedLog.java:262)
On Thu, Oct 24, 2019 at 10:36 PM Vishal Santoshi <vishal.santo...@gmail.com> wrote: > I might have created a build from the trunk, rather then the 2.4 branch , > but will confirm. > > On Thu, Oct 24, 2019 at 4:44 PM Vishal Santoshi <vishal.santo...@gmail.com> > wrote: > >> The above may not be an issue as in it just uses the returned class >> loader to resolve the Connector I think . What is not obvious, why it does >> not go ahead and consume .. >> >> [mm2-dev-749469cf68-vpm2l] [2019-10-24 20:26:07,571] INFO refreshing >> known target topics took 15 ms >> (org.apache.kafka.connect.mirror.Scheduler:95) >> >> [mm2-dev-749469cf68-vpm2l] [2019-10-24 20:26:07,574] INFO Started >> MirrorSourceConnector with 120 topic-partitions. >> (org.apache.kafka.connect.mirror.MirrorSourceConnector:121) >> >> [mm2-dev-749469cf68-vpm2l] [2019-10-24 20:26:07,574] INFO Starting >> MirrorSourceConnector took 160 ms. >> (org.apache.kafka.connect.mirror.MirrorSourceConnector:122) >> >> [mm2-dev-749469cf68-vpm2l] [2019-10-24 20:26:07,577] INFO Finished >> creating connector MirrorSourceConnector >> (org.apache.kafka.connect.runtime.Worker:272) >> >> [mm2-dev-749469cf68-vpm2l] [2019-10-24 20:26:07,578] ERROR Plugin class >> loader for connector: >> 'org.apache.kafka.connect.mirror.MirrorSourceConnector' was not found. >> Returning: >> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader@2262b621 >> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:165) >> >> [mm2-dev-749469cf68-vpm2l] [2019-10-24 20:26:07,579] INFO >> SourceConnectorConfig values: >> >> [mm2-dev-749469cf68-vpm2l] config.action.reload = restart >> >> [mm2-dev-749469cf68-vpm2l] connector.class = >> org.apache.kafka.connect.mirror.MirrorSourceConnector >> >> [mm2-dev-749469cf68-vpm2l] errors.log.enable = false >> >> [mm2-dev-749469cf68-vpm2l] errors.log.include.messages = false >> >> [mm2-dev-749469cf68-vpm2l] errors.retry.delay.max.ms = 60000 >> >> [mm2-dev-749469cf68-vpm2l] errors.retry.timeout = 0 >> >> [mm2-dev-749469cf68-vpm2l] errors.tolerance = none >> >> [mm2-dev-749469cf68-vpm2l] header.converter = null >> >> [mm2-dev-749469cf68-vpm2l] key.converter = null >> >> [mm2-dev-749469cf68-vpm2l] name = MirrorSourceConnector >> >> [mm2-dev-749469cf68-vpm2l] tasks.max = 48 >> >> [mm2-dev-749469cf68-vpm2l] transforms = [] >> >> [mm2-dev-749469cf68-vpm2l] value.converter = null >> >> [mm2-dev-749469cf68-vpm2l] >> (org.apache.kafka.connect.runtime.SourceConnectorConfig:347) >> >> [mm2-dev-749469cf68-vpm2l] [2019-10-24 20:26:07,580] INFO >> EnrichedConnectorConfig values: >> >> [mm2-dev-749469cf68-vpm2l] config.action.reload = restart >> >> [mm2-dev-749469cf68-vpm2l] connector.class = >> org.apache.kafka.connect.mirror.MirrorSourceConnector >> >> [mm2-dev-749469cf68-vpm2l] errors.log.enable = false >> >> [mm2-dev-749469cf68-vpm2l] errors.log.include.messages = false >> >> [mm2-dev-749469cf68-vpm2l] errors.retry.delay.max.ms = 60000 >> >> [mm2-dev-749469cf68-vpm2l] errors.retry.timeout = 0 >> >> [mm2-dev-749469cf68-vpm2l] errors.tolerance = none >> >> [mm2-dev-749469cf68-vpm2l] header.converter = null >> >> [mm2-dev-749469cf68-vpm2l] key.converter = null >> >> [mm2-dev-749469cf68-vpm2l] name = MirrorSourceConnector >> >> [mm2-dev-749469cf68-vpm2l] tasks.max = 48 >> >> [mm2-dev-749469cf68-vpm2l] transforms = [] >> >> [mm2-dev-749469cf68-vpm2l] value.converter = null >> >> [mm2-dev-749469cf68-vpm2l] >> (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:347) >> >> >> >> And from there on nothing.. >> >> >> >> >> On Thu, Oct 24, 2019 at 3:02 PM Vishal Santoshi < >> vishal.santo...@gmail.com> wrote: >> >>> Hey Ryanne, >>> >>> Seeing the below ERROR in the logs and then, it seems the >>> process does not consume ( it does not exit with any errors ) . And this is >>> intermittent. As in do it enough times. that does relaunch :) Is this >>> something a known bug >>> >>> [mm2-dev-58bf5df684-ln9k2] [2019-10-24 18:41:03,067] ERROR >>> Plugin class loader for connector: >>> 'org.apache.kafka.connect.mirror.MirrorHeartbeatConnector' was not found. >>> Returning: >>> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader@2262b621 >>> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:165) >>> >>> >>> >>> >>> >>> >>> On Mon, Oct 21, 2019 at 5:16 PM Ryanne Dolan <ryannedo...@gmail.com> >>> wrote: >>> >>>> Vishal, the number of tasks created per source->target herder is >>>> determined >>>> by both tasks.max and the total number of topic-partitions being >>>> replicated. In order to use all 12 worker nodes, you'd need tasks.max >>>> >= 12 >>>> and number of topic-partitions >= 12. From previous emails it sounds >>>> like >>>> you have a small number of topic-partitions total (i.e. a small number >>>> of >>>> topics with a small number of partitions per topic), so I'm guessing >>>> that's >>>> the reason you aren't seeing more tasks being created. >>>> >>>> Ryanne >>>> >>>> On Sat, Oct 19, 2019 at 1:28 AM Vishal Santoshi < >>>> vishal.santo...@gmail.com> >>>> wrote: >>>> >>>> > Here is what I see >>>> > >>>> > * The max tasks are a a cap on a Connector across the cluster. If >>>> have 8 >>>> > VMs but 8 max tasks my assumption that there would be 8 * 8 = 72 task >>>> > threads was >>>> > wring. The logs showed that the partitions were consumed by 8 >>>> threads on >>>> > the 8 VMs ( 1 per VM ) which was highly un optimal. When I scaled the >>>> > VMs to 12, it did not matter, as the max tasks still prevented any >>>> further >>>> > distribution. >>>> > >>>> > * If I cancel/resume the cluster with a max task of 48 ( keeping the >>>> same >>>> > job name and thus connector definition the max tasks does not >>>> change, as >>>> > in >>>> > it seems to keep the same number of max task threads limit ( as in >>>> 8 ) >>>> > >>>> > * I can bring down a VM and see the task migrate to a free VM but the >>>> > overall count of task threads remain the same. >>>> > >>>> > >>>> > In essence, the num of tasks is a cap on threads in the cluster per >>>> > connector, A connector is a source->sink pair that spans a cluster. >>>> Thus if >>>> > we have a >>>> > A->B DAG and max tasks of 8, then there will be no more that 8 Source >>>> > Tasks ( threads ) no matter how big the cluster is, It thus makes >>>> sense to >>>> > over provision ( within limits of a single VM ) on the max tasks to >>>> allow >>>> > for adding more VMs for scale up..... >>>> > >>>> > >>>> > >>>> > On Fri, Oct 18, 2019 at 8:04 PM Vishal Santoshi < >>>> vishal.santo...@gmail.com >>>> > > >>>> > wrote: >>>> > >>>> > > I misspoke >>>> > > >>>> > > >> I now have 8 VMs 8 cpus with 48 max tasks and it did spread to >>>> the the >>>> > > 8 VMs. I then upscaled to 12 VMs and the tasks *have not *migrated >>>> as I >>>> > > would expect . >>>> > > >>>> > > >>>> > > >>>> > > >>>> > > On Fri, Oct 18, 2019 at 8:00 PM Vishal Santoshi < >>>> > vishal.santo...@gmail.com> >>>> > > wrote: >>>> > > >>>> > >> OK, You will have to explain :) >>>> > >> >>>> > >> I had 12 VMs with 8 cpus and 8 max tasks. I thought let me give a >>>> CPU >>>> > to >>>> > >> each task, which I presumed is a java thread ( even though I know >>>> the >>>> > >> thread would be mostly ip bound ). . I saw the issue I pointed up. >>>> > >> *I now have 8 VMs 8 cpus with 48 max tasks and it did spread to >>>> the the >>>> > >> 8 VMs. I then upscaled to 12 VMs and the tasks migrated as I would >>>> > expect >>>> > >> .* >>>> > >> >>>> > >> I know that a VM will have MirrorSourceConnector and >>>> > >> MirrorHeartbeatConnector tasks up till tasks.max. So a few >>>> questions >>>> > >> >>>> > >> >>>> > >> >>>> > >> * When we say there are 48 max tasks, are we saying there are 48 >>>> > threads >>>> > >> ( in fact 96, each for the 2 groups above, worst case + 2 ) ? >>>> > >> * When we talk about Connector, are we talking about a JVM >>>> process, as >>>> > in >>>> > >> a Connector is a JVM process ? >>>> > >> * Why larger number of tasks.max help the spread ? As in I would >>>> > assume >>>> > >> there are up till 8 tasks ( or 16 ) per VM but how that should >>>> not have >>>> > >> prevented re assignment on a scale up ( as it clearly did ) ? >>>> > >> >>>> > >> The reason I ask is that I plan to run mm2 cluster on k8s and I >>>> want to >>>> > >> make sure that I use the version of JVM that is more docker >>>> friendly >>>> > vis a >>>> > >> vis, how many cpus it believes it has and as explained here >>>> > >> >>>> > >>>> https://blog.softwaremill.com/docker-support-in-new-java-8-finally-fd595df0ca54 >>>> > >> >>>> > >> >>>> > >> >>>> > >> >>>> > >> On Fri, Oct 18, 2019 at 4:15 PM Ryanne Dolan < >>>> ryannedo...@gmail.com> >>>> > >> wrote: >>>> > >> >>>> > >>> What is tasks.max? Consider bumping to something like 48 if you're >>>> > >>> running >>>> > >>> on a dozen nodes. >>>> > >>> >>>> > >>> Ryanne >>>> > >>> >>>> > >>> On Fri, Oct 18, 2019, 1:43 PM Vishal Santoshi < >>>> > vishal.santo...@gmail.com >>>> > >>> > >>>> > >>> wrote: >>>> > >>> >>>> > >>> > Hey Ryanne, >>>> > >>> > >>>> > >>> > >>>> > >>> > I see a definite issue. I am doing an intense test >>>> and I >>>> > >>> bring >>>> > >>> > up 12 VMs ( they are 12 pods with 8 cpus each ), replicating >>>> about >>>> > 1200 >>>> > >>> > plus topics ( fairly heavy 100mbps ) ... They are acquired and >>>> are >>>> > >>> > staggered as they come up..I see a fraction of these nodes not >>>> > >>> assigned any >>>> > >>> > replication....There is plenty to go around. ( more then a >>>> couple of >>>> > >>> > thousand partitions ) . is there something I am missing.... >>>> As in >>>> > my >>>> > >>> > current case 5 of the 12 VMs are idle.. >>>> > >>> > >>>> > >>> > Vishal >>>> > >>> > >>>> > >>> > On Fri, Oct 18, 2019 at 7:05 AM Vishal Santoshi < >>>> > >>> vishal.santo...@gmail.com >>>> > >>> > > >>>> > >>> > wrote: >>>> > >>> > >>>> > >>> > > Oh sorry a. COUNTER... is more like it.... >>>> > >>> > > >>>> > >>> > > On Fri, Oct 18, 2019, 6:58 AM Vishal Santoshi < >>>> > >>> vishal.santo...@gmail.com >>>> > >>> > > >>>> > >>> > > wrote: >>>> > >>> > > >>>> > >>> > >> Will do >>>> > >>> > >> One more thing the age/latency metrics seem to be >>>> analogous as >>>> > >>> in >>>> > >>> > >> they seem to be calculated using similar routines. I would >>>> think >>>> > a >>>> > >>> > metric >>>> > >>> > >> tracking >>>> > >>> > >> the number of flush failures ( as a GAUGE ) given >>>> > >>> > >> offset.flush.timeout.ms would be highly beneficial. >>>> > >>> > >> >>>> > >>> > >> Regards.. >>>> > >>> > >> >>>> > >>> > >> >>>> > >>> > >> On Thu, Oct 17, 2019 at 11:53 PM Ryanne Dolan < >>>> > >>> ryannedo...@gmail.com> >>>> > >>> > >> wrote: >>>> > >>> > >> >>>> > >>> > >>> Ah, I see you are correct. Also I misspoke saying "workers" >>>> > >>> earlier, as >>>> > >>> > >>> the >>>> > >>> > >>> consumer is not created by the worker, but the task. >>>> > >>> > >>> >>>> > >>> > >>> I suppose the put() could be changed to putIfAbsent() here >>>> to >>>> > >>> enable >>>> > >>> > this >>>> > >>> > >>> property to be changed. Maybe submit a PR? >>>> > >>> > >>> >>>> > >>> > >>> Ryanne >>>> > >>> > >>> >>>> > >>> > >>> On Thu, Oct 17, 2019 at 10:00 AM Vishal Santoshi < >>>> > >>> > >>> vishal.santo...@gmail.com> >>>> > >>> > >>> wrote: >>>> > >>> > >>> >>>> > >>> > >>> > Hmm ( I did both ) >>>> > >>> > >>> > >>>> > >>> > >>> > another->another_test.enabled = true >>>> > >>> > >>> > >>>> > >>> > >>> > another->another_test.topics = act_post >>>> > >>> > >>> > >>>> > >>> > >>> > another->another_test.emit.heartbeats.enabled = false >>>> > >>> > >>> > >>>> > >>> > >>> > another->another_test.consumer.auto.offset.reset = latest >>>> > >>> > >>> > >>>> > >>> > >>> > another->another_test.sync.topic.acls.enabled = false >>>> > >>> > >>> > >>>> > >>> > >>> > another.consumer.auto.offset.reset = latest >>>> > >>> > >>> > >>>> > >>> > >>> > >>>> > >>> > >>> > >>>> > >>> > >>> > When I grep for the ConsumerConfig ( and there are 8 >>>> instances, >>>> > >>> this >>>> > >>> > >>> topic >>>> > >>> > >>> > has 4 partitions ) >>>> > >>> > >>> > >>>> > >>> > >>> > >>>> > >>> > >>> > [2019-10-17 14:01:21,879] INFO ConsumerConfig values: >>>> > >>> > >>> > >>>> > >>> > >>> > allow.auto.create.topics = true >>>> > >>> > >>> > >>>> > >>> > >>> > auto.commit.interval.ms = 5000 >>>> > >>> > >>> > >>>> > >>> > >>> > *auto.offset.reset* = earliest >>>> > >>> > >>> > >>>> > >>> > >>> > >>>> > >>> > >>> > I am now using the 2.4 branch from kafka trunk >>>> > >>> > >>> > https://github.com/apache/kafka/tree/2.4/connect/mirror >>>> > >>> > >>> > >>>> > >>> > >>> > >>>> > >>> > >>> > This code change works and makes sense.. I think all other >>>> > >>> settings >>>> > >>> > >>> will be >>>> > >>> > >>> > fine ( as can be overridden ) but for the 2 below.. >>>> > >>> > >>> > >>>> > >>> > >>> > >>>> > >>> > >>> > *--- >>>> > >>> > >>> > >>>> > >>> > >>> > >>>> > >>> > >>> >>>> > >>> > >>>> > >>> >>>> > >>>> a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java* >>>> > >>> > >>> > >>>> > >>> > >>> > *+++ >>>> > >>> > >>> > >>>> > >>> > >>> > >>>> > >>> > >>> >>>> > >>> > >>>> > >>> >>>> > >>>> b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java* >>>> > >>> > >>> > >>>> > >>> > >>> > @@ -230,7 +230,7 @@ public class MirrorConnectorConfig >>>> extends >>>> > >>> > >>> > AbstractConfig { >>>> > >>> > >>> > >>>> > >>> > >>> > >>>> > >>> > >>> > >>>> > >>> > >>>> > >>>> props.keySet().retainAll(MirrorClientConfig.CLIENT_CONFIG_DEF.names()); >>>> > >>> > >>> > >>>> > >>> > >>> > >>>> > >>> props.putAll(originalsWithPrefix(CONSUMER_CLIENT_PREFIX)); >>>> > >>> > >>> > >>>> > >>> > >>> > props.put("enable.auto.commit", "false"); >>>> > >>> > >>> > >>>> > >>> > >>> > - props.put("auto.offset.reset", "earliest"); >>>> > >>> > >>> > >>>> > >>> > >>> > + props.put("auto.offset.reset", "latest"); >>>> > >>> > >>> > >>>> > >>> > >>> > return props; >>>> > >>> > >>> > >>>> > >>> > >>> > } >>>> > >>> > >>> > >>>> > >>> > >>> > >>>> > >>> > >>> > Regards. >>>> > >>> > >>> > >>>> > >>> > >>> > On Wed, Oct 16, 2019 at 3:36 PM Ryanne Dolan < >>>> > >>> ryannedo...@gmail.com> >>>> > >>> > >>> > wrote: >>>> > >>> > >>> > >>>> > >>> > >>> > > Vishal, you should be able to override the properties >>>> passed >>>> > >>> to the >>>> > >>> > >>> > > internal workers using properties like >>>> > >>> > >>> A->B.consumer.auto.offset.reset or >>>> > >>> > >>> > > A.consumer.auto.offset.reset in the mm2.properties file. >>>> > >>> Certain >>>> > >>> > >>> > top-level >>>> > >>> > >>> > > properties like tasks.max are honored without the A->B >>>> or A >>>> > >>> prefix, >>>> > >>> > >>> but >>>> > >>> > >>> > > auto.offset.reset is not one of them. >>>> > >>> > >>> > > >>>> > >>> > >>> > > Ryanne >>>> > >>> > >>> > > >>>> > >>> > >>> > > On Wed, Oct 16, 2019 at 9:13 AM Vishal Santoshi < >>>> > >>> > >>> > vishal.santo...@gmail.com >>>> > >>> > >>> > > > >>>> > >>> > >>> > > wrote: >>>> > >>> > >>> > > >>>> > >>> > >>> > > > Hey Ryanne, >>>> > >>> > >>> > > > >>>> > >>> > >>> > > > >>>> > >>> > >>> > > > How do I override auto.offset.reset = latest for >>>> > >>> consumers >>>> > >>> > >>> through >>>> > >>> > >>> > > > mm2.properties. I have tried straight up . >>>> > auto.offset.reset >>>> > >>> and >>>> > >>> > >>> > > consumer. >>>> > >>> > >>> > > > auto.offset.reset but it defaults to earliest.. I do >>>> have >>>> > a >>>> > >>> > query >>>> > >>> > >>> in >>>> > >>> > >>> > > > another thread but though you might know off hand.. >>>> > >>> > >>> > > > >>>> > >>> > >>> > > > I would imagine there is some way in general of >>>> overriding >>>> > >>> > >>> consumer and >>>> > >>> > >>> > > > producer configs through mm2.properties in MM2 ? >>>> > >>> > >>> > > > >>>> > >>> > >>> > > > Regards. >>>> > >>> > >>> > > > >>>> > >>> > >>> > > > On Tue, Oct 15, 2019 at 3:44 PM Vishal Santoshi < >>>> > >>> > >>> > > vishal.santo...@gmail.com >>>> > >>> > >>> > > > > >>>> > >>> > >>> > > > wrote: >>>> > >>> > >>> > > > >>>> > >>> > >>> > > > > Thank you so much for all your help. Will keep you >>>> > posted >>>> > >>> on >>>> > >>> > >>> tests I >>>> > >>> > >>> > > > do.. >>>> > >>> > >>> > > > > I hope this is helpful to other folks too.. >>>> > >>> > >>> > > > > >>>> > >>> > >>> > > > > On Tue, Oct 15, 2019 at 2:44 PM Ryanne Dolan < >>>> > >>> > >>> ryannedo...@gmail.com> >>>> > >>> > >>> > > > > wrote: >>>> > >>> > >>> > > > > >>>> > >>> > >>> > > > >> That's right. MM2 is at-least-once for now, same as >>>> > legacy >>>> > >>> > >>> > > MirrorMaker. >>>> > >>> > >>> > > > >> You >>>> > >>> > >>> > > > >> can follow >>>> > >>> https://issues.apache.org/jira/browse/KAFKA-6080 >>>> > >>> > for >>>> > >>> > >>> > > updates >>>> > >>> > >>> > > > >> on >>>> > >>> > >>> > > > >> exactly-once semantics in Connect. >>>> > >>> > >>> > > > >> >>>> > >>> > >>> > > > >> Ryanne >>>> > >>> > >>> > > > >> >>>> > >>> > >>> > > > >> On Tue, Oct 15, 2019 at 1:24 PM Vishal Santoshi < >>>> > >>> > >>> > > > >> vishal.santo...@gmail.com> >>>> > >>> > >>> > > > >> wrote: >>>> > >>> > >>> > > > >> >>>> > >>> > >>> > > > >> > >> You are correct. I'm working on a KIP and >>>> PoC to >>>> > >>> > >>> introduce >>>> > >>> > >>> > > > >> > transactions to >>>> > >>> > >>> > > > >> > >> Connect for this exact purpose :) >>>> > >>> > >>> > > > >> > >>>> > >>> > >>> > > > >> > That is awesome. Any time frame ? >>>> > >>> > >>> > > > >> > >>>> > >>> > >>> > > > >> > >>>> > >>> > >>> > > > >> > In the mean time the SLA as of now >>>> > >>> > >>> > > > >> > >>>> > >>> > >>> > > > >> > 1. It is conceivable that we flush the producer >>>> to the >>>> > >>> > target >>>> > >>> > >>> > > cluster >>>> > >>> > >>> > > > >> but >>>> > >>> > >>> > > > >> > fail to offset commit. If there was a restart >>>> before >>>> > the >>>> > >>> > next >>>> > >>> > >>> > > > successful >>>> > >>> > >>> > > > >> > offset commit, there will be duplicates and a >>>> part >>>> > of >>>> > >>> data >>>> > >>> > >>> is >>>> > >>> > >>> > > > >> replayed ( >>>> > >>> > >>> > > > >> > at least once ) ? >>>> > >>> > >>> > > > >> > >>>> > >>> > >>> > > > >> > 2. The same can be said about partial flushes, >>>> though >>>> > >>> am >>>> > >>> > not >>>> > >>> > >>> sure >>>> > >>> > >>> > > > about >>>> > >>> > >>> > > > >> > how kafka addresses flush ( Is a flush either >>>> success >>>> > >>> or a >>>> > >>> > >>> > failure, >>>> > >>> > >>> > > > and >>>> > >>> > >>> > > > >> > nothing in between ) >>>> > >>> > >>> > > > >> > >>>> > >>> > >>> > > > >> > Thanks.. >>>> > >>> > >>> > > > >> > >>>> > >>> > >>> > > > >> > On Tue, Oct 15, 2019 at 12:34 PM Ryanne Dolan < >>>> > >>> > >>> > > ryannedo...@gmail.com> >>>> > >>> > >>> > > > >> > wrote: >>>> > >>> > >>> > > > >> > >>>> > >>> > >>> > > > >> > > Hey Vishal, glad to hear you're making >>>> progress. >>>> > >>> > >>> > > > >> > > >>>> > >>> > >>> > > > >> > > > 1. It seems though that flushing [...] the >>>> > producer >>>> > >>> and >>>> > >>> > >>> > setting >>>> > >>> > >>> > > > the >>>> > >>> > >>> > > > >> > > > offset to the compacting topic is not >>>> atomic OR >>>> > >>> do we >>>> > >>> > >>> use >>>> > >>> > >>> > > > >> > > > transactions here ? >>>> > >>> > >>> > > > >> > > >>>> > >>> > >>> > > > >> > > You are correct. I'm working on a KIP and PoC >>>> to >>>> > >>> introduce >>>> > >>> > >>> > > > >> transactions >>>> > >>> > >>> > > > >> > to >>>> > >>> > >>> > > > >> > > Connect for this exact purpose :) >>>> > >>> > >>> > > > >> > > >>>> > >>> > >>> > > > >> > > > I think these are 4 threads ( b'coz >>>> num.tasks=4 ), >>>> > >>> and I >>>> > >>> > >>> have >>>> > >>> > >>> > 2 >>>> > >>> > >>> > > > >> topics >>>> > >>> > >>> > > > >> > > with >>>> > >>> > >>> > > > >> > > > 1 partition each. Do I assume this right, as >>>> in >>>> > >>> there >>>> > >>> > are >>>> > >>> > >>> 4 >>>> > >>> > >>> > > > consumer >>>> > >>> > >>> > > > >> > > groups >>>> > >>> > >>> > > > >> > > > ( on CG per thread ) ... >>>> > >>> > >>> > > > >> > > >>>> > >>> > >>> > > > >> > > Some details here: >>>> > >>> > >>> > > > >> > > - tasks.max controls the maximum number of >>>> tasks >>>> > >>> created >>>> > >>> > per >>>> > >>> > >>> > > > Connector >>>> > >>> > >>> > > > >> > > instance. Both MirrorSourceConnector and >>>> > >>> > >>> > MirrorCheckpointConnector >>>> > >>> > >>> > > > >> will >>>> > >>> > >>> > > > >> > > create multiple tasks (up to tasks.max), but >>>> > >>> > >>> > > > MirrorHeartbeatConnector >>>> > >>> > >>> > > > >> > only >>>> > >>> > >>> > > > >> > > ever creates a single task. Moreover, there >>>> cannot >>>> > be >>>> > >>> more >>>> > >>> > >>> tasks >>>> > >>> > >>> > > > than >>>> > >>> > >>> > > > >> > > topic-partitions (for MirrorSourceConnector) or >>>> > >>> consumer >>>> > >>> > >>> groups >>>> > >>> > >>> > > (for >>>> > >>> > >>> > > > >> > > MirrorCheckpointConnector). So if you have two >>>> > topics >>>> > >>> with >>>> > >>> > >>> one >>>> > >>> > >>> > > > >> partition >>>> > >>> > >>> > > > >> > > each and 1 consumer group total, you'll have >>>> two >>>> > >>> > >>> > > > MirrorSourceConnector >>>> > >>> > >>> > > > >> > > tasks, one MirrorHeartbeatConnector task, and >>>> one >>>> > >>> > >>> > > > >> > MirrorCheckpointConnector >>>> > >>> > >>> > > > >> > > tasks, for a total of four. And that's in one >>>> > >>> direction >>>> > >>> > >>> only: if >>>> > >>> > >>> > > you >>>> > >>> > >>> > > > >> have >>>> > >>> > >>> > > > >> > > multiple source->target herders enabled, each >>>> will >>>> > >>> create >>>> > >>> > >>> tasks >>>> > >>> > >>> > > > >> > > independently. >>>> > >>> > >>> > > > >> > > - There are no consumer groups in MM2, >>>> technically. >>>> > >>> The >>>> > >>> > >>> Connect >>>> > >>> > >>> > > > >> framework >>>> > >>> > >>> > > > >> > > uses the Coordinator API and internal topics to >>>> > divide >>>> > >>> > tasks >>>> > >>> > >>> > among >>>> > >>> > >>> > > > >> > workers >>>> > >>> > >>> > > > >> > > -- not a consumer group per se. The MM2 >>>> connectors >>>> > >>> use the >>>> > >>> > >>> > > assign() >>>> > >>> > >>> > > > >> API, >>>> > >>> > >>> > > > >> > > not the subscribe() API, so there are no >>>> consumer >>>> > >>> groups >>>> > >>> > >>> there >>>> > >>> > >>> > > > >> either. In >>>> > >>> > >>> > > > >> > > fact, they don't commit() either. This is >>>> nice, as >>>> > it >>>> > >>> > >>> > eliminates a >>>> > >>> > >>> > > > >> lot of >>>> > >>> > >>> > > > >> > > the rebalancing problems legacy MirrorMaker >>>> has been >>>> > >>> > plagued >>>> > >>> > >>> > with. >>>> > >>> > >>> > > > >> With >>>> > >>> > >>> > > > >> > > MM2, rebalancing only occurs when the number of >>>> > >>> workers >>>> > >>> > >>> changes >>>> > >>> > >>> > or >>>> > >>> > >>> > > > >> when >>>> > >>> > >>> > > > >> > the >>>> > >>> > >>> > > > >> > > assignments change (e.g. new topics are >>>> discovered). >>>> > >>> > >>> > > > >> > > >>>> > >>> > >>> > > > >> > > Ryanne >>>> > >>> > >>> > > > >> > > >>>> > >>> > >>> > > > >> > > On Tue, Oct 15, 2019 at 10:23 AM Vishal >>>> Santoshi < >>>> > >>> > >>> > > > >> > > vishal.santo...@gmail.com> >>>> > >>> > >>> > > > >> > > wrote: >>>> > >>> > >>> > > > >> > > >>>> > >>> > >>> > > > >> > > > Hey Ryanne, >>>> > >>> > >>> > > > >> > > > >>>> > >>> > >>> > > > >> > > > >>>> > >>> > >>> > > > >> > > > The test was on topics that had a >>>> 7 day >>>> > >>> > >>> retention. >>>> > >>> > >>> > > > Which >>>> > >>> > >>> > > > >> > > > generally implies that the batch size for >>>> flush is >>>> > >>> > pretty >>>> > >>> > >>> > high ( >>>> > >>> > >>> > > > >> till >>>> > >>> > >>> > > > >> > the >>>> > >>> > >>> > > > >> > > > consumption becomes current ). The >>>> > >>> > >>> offset.flush.timeout.ms >>>> > >>> > >>> > > > >> defaults >>>> > >>> > >>> > > > >> > to >>>> > >>> > >>> > > > >> > > 5 >>>> > >>> > >>> > > > >> > > > seconds and the code will not send in the >>>> offsets >>>> > >>> if the >>>> > >>> > >>> flush >>>> > >>> > >>> > > is >>>> > >>> > >>> > > > >> not >>>> > >>> > >>> > > > >> > > > complete. Increasing that time out did solve >>>> the >>>> > >>> "not >>>> > >>> > >>> sending >>>> > >>> > >>> > > the >>>> > >>> > >>> > > > >> > offset >>>> > >>> > >>> > > > >> > > to >>>> > >>> > >>> > > > >> > > > topic" issue. >>>> > >>> > >>> > > > >> > > > >>>> > >>> > >>> > > > >> > > > Two questions ( I am being greedy here :) ) >>>> > >>> > >>> > > > >> > > > >>>> > >>> > >>> > > > >> > > > 1. It seems though that flushing the >>>> flushing the >>>> > >>> > >>> producer and >>>> > >>> > >>> > > > >> setting >>>> > >>> > >>> > > > >> > > the >>>> > >>> > >>> > > > >> > > > offset to the compacting topic is not >>>> atomic OR >>>> > >>> do we >>>> > >>> > >>> use >>>> > >>> > >>> > > > >> > > > transactions here ? >>>> > >>> > >>> > > > >> > > > >>>> > >>> > >>> > > > >> > > > 2. I see >>>> > >>> > >>> > > > >> > > > >>>> > >>> > >>> > > > >> > > > >>>> WorkerSourceTask{id=MirrorHeartbeatConnector-0} >>>> > >>> > flushing >>>> > >>> > >>> > 956435 >>>> > >>> > >>> > > > >> > > > >>>> > >>> > >>> > > > >> > > > WorkerSourceTask{id=MirrorSourceConnector-1} >>>> > >>> flushing >>>> > >>> > >>> 356251 >>>> > >>> > >>> > > > >> > > > >>>> > >>> > >>> > > > >> > > > >>>> WorkerSourceTask{id=MirrorCheckpointConnector-2} >>>> > >>> > >>> flushing 0 >>>> > >>> > >>> > > > >> > > > >>>> > >>> > >>> > > > >> > > > >>>> WorkerSourceTask{id=MirrorCheckpointConnector-3} >>>> > >>> > >>> flushing 0 >>>> > >>> > >>> > > > >> > outstanding >>>> > >>> > >>> > > > >> > > > messages >>>> > >>> > >>> > > > >> > > > >>>> > >>> > >>> > > > >> > > > >>>> > >>> > >>> > > > >> > > > I think these are 4 threads ( b'coz >>>> num.tasks=4 ), >>>> > >>> and I >>>> > >>> > >>> have >>>> > >>> > >>> > 2 >>>> > >>> > >>> > > > >> topics >>>> > >>> > >>> > > > >> > > with >>>> > >>> > >>> > > > >> > > > 1 partition each. Do I assume this right, as >>>> in >>>> > >>> there >>>> > >>> > are >>>> > >>> > >>> 4 >>>> > >>> > >>> > > > consumer >>>> > >>> > >>> > > > >> > > groups >>>> > >>> > >>> > > > >> > > > ( on CG per thread ) ... >>>> > >>> > >>> > > > >> > > > >>>> > >>> > >>> > > > >> > > > >>>> > >>> > >>> > > > >> > > > >>>> > >>> > >>> > > > >> > > > >>>> > >>> > >>> > > > >> > > > >>>> > >>> > >>> > > > >> > > > THANKS A LOT >>>> > >>> > >>> > > > >> > > > >>>> > >>> > >>> > > > >> > > > >>>> > >>> > >>> > > > >> > > > Vishal. >>>> > >>> > >>> > > > >> > > > >>>> > >>> > >>> > > > >> > > > >>>> > >>> > >>> > > > >> > > > >>>> > >>> > >>> > > > >> > > > On Mon, Oct 14, 2019 at 3:42 PM Ryanne Dolan >>>> < >>>> > >>> > >>> > > > ryannedo...@gmail.com >>>> > >>> > >>> > > > >> > >>>> > >>> > >>> > > > >> > > > wrote: >>>> > >>> > >>> > > > >> > > > >>>> > >>> > >>> > > > >> > > > > > timed out >>>> > >>> > >>> > > > >> > > > > while waiting for producer to flush >>>> outstanding >>>> > >>> > >>> > > > >> > > > > >>>> > >>> > >>> > > > >> > > > > Yeah, that's what I'd expect to see if >>>> Connect >>>> > was >>>> > >>> > >>> unable to >>>> > >>> > >>> > > > send >>>> > >>> > >>> > > > >> > > records >>>> > >>> > >>> > > > >> > > > > to the downstream remote topics, e.g. if >>>> > >>> > >>> > min.in-sync.replicas >>>> > >>> > >>> > > > were >>>> > >>> > >>> > > > >> > > > > misconfigured. Given some data seems to >>>> arrive, >>>> > >>> it's >>>> > >>> > >>> > possible >>>> > >>> > >>> > > > that >>>> > >>> > >>> > > > >> > > > > everything is configured correctly but >>>> with too >>>> > >>> much >>>> > >>> > >>> latency >>>> > >>> > >>> > > to >>>> > >>> > >>> > > > >> > > > > successfully commit within the default >>>> timeouts. >>>> > >>> You >>>> > >>> > may >>>> > >>> > >>> > want >>>> > >>> > >>> > > to >>>> > >>> > >>> > > > >> > > increase >>>> > >>> > >>> > > > >> > > > > the number of tasks substantially to >>>> achieve >>>> > more >>>> > >>> > >>> > parallelism >>>> > >>> > >>> > > > and >>>> > >>> > >>> > > > >> > > > > throughput. >>>> > >>> > >>> > > > >> > > > > >>>> > >>> > >>> > > > >> > > > > Ryanne >>>> > >>> > >>> > > > >> > > > > >>>> > >>> > >>> > > > >> > > > > On Mon, Oct 14, 2019, 2:30 PM Vishal >>>> Santoshi < >>>> > >>> > >>> > > > >> > > vishal.santo...@gmail.com >>>> > >>> > >>> > > > >> > > > > >>>> > >>> > >>> > > > >> > > > > wrote: >>>> > >>> > >>> > > > >> > > > > >>>> > >>> > >>> > > > >> > > > > > Aah no.. this is more to it. Note sure >>>> if >>>> > >>> related >>>> > >>> > to >>>> > >>> > >>> the >>>> > >>> > >>> > > > above. >>>> > >>> > >>> > > > >> > > > > > >>>> > >>> > >>> > > > >> > > > > > >>>> > >>> > >>> > > > >> > > > > > >>>> > >>> > >>> > > > >> > > > > >>>> > >>> > >>> > > > >> > > > >>>> > >>> > >>> > > > >> > > >>>> > >>> > >>> > > > >> > >>>> > >>> > >>> > > > >> >>>> > >>> > >>> > > > >>>> > >>> > >>> > > >>>> > >>> > >>> > >>>> > >>> > >>> >>>> > >>> > >>>> > >>> >>>> > >>>> https://github.com/axbaretto/kafka/blob/master/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java#L114 >>>> > >>> > >>> > > > >> > > > > > >>>> > >>> > >>> > > > >> > > > > > >>>> > >>> > >>> > > > >> > > > > > Is timing out based on >>>> > >>> > >>> > > > >> > > > > > >>>> > >>> > >>> > > > >> > > > > > >>>> > >>> > >>> > > > >> > > > > >>>> > >>> > >>> > > > >> > > > >>>> > >>> > >>> > > > >> > > >>>> > >>> > >>> > > > >> > >>>> > >>> > >>> > > > >> >>>> > >>> > >>> > > > >>>> > >>> > >>> > > >>>> > >>> > >>> > >>>> > >>> > >>> >>>> > >>> > >>>> > >>> >>>> > >>>> https://github.com/axbaretto/kafka/blob/master/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java#L133 >>>> > >>> > >>> > > > >> > > > > > >>>> > >>> > >>> > > > >> > > > > > [2019-10-14 18:55:20,820] ERROR >>>> > >>> > >>> > > > >> > > > > > >>>> WorkerSourceTask{id=MirrorSourceConnector-0} >>>> > >>> Failed >>>> > >>> > to >>>> > >>> > >>> > > flush, >>>> > >>> > >>> > > > >> timed >>>> > >>> > >>> > > > >> > > out >>>> > >>> > >>> > > > >> > > > > > while waiting for producer to flush >>>> > outstanding >>>> > >>> > 36478 >>>> > >>> > >>> > > messages >>>> > >>> > >>> > > > >> > > > > > >>>> > >>> > >>> (org.apache.kafka.connect.runtime.WorkerSourceTask:423) >>>> > >>> > >>> > > > >> > > > > > >>>> > >>> > >>> > > > >> > > > > > >>>> > >>> > >>> > > > >> > > > > > >>>> > >>> > >>> > > > >> > > > > > On Mon, Oct 14, 2019 at 3:15 PM Vishal >>>> > Santoshi >>>> > >>> < >>>> > >>> > >>> > > > >> > > > > vishal.santo...@gmail.com >>>> > >>> > >>> > > > >> > > > > > > >>>> > >>> > >>> > > > >> > > > > > wrote: >>>> > >>> > >>> > > > >> > > > > > >>>> > >>> > >>> > > > >> > > > > > > I think this might be it.. Could you >>>> > confirm. >>>> > >>> It >>>> > >>> > >>> seems >>>> > >>> > >>> > to >>>> > >>> > >>> > > be >>>> > >>> > >>> > > > >> on >>>> > >>> > >>> > > > >> > the >>>> > >>> > >>> > > > >> > > > > path >>>> > >>> > >>> > > > >> > > > > > > to commit the offsets.. but not sure... >>>> > >>> > >>> > > > >> > > > > > > >>>> > >>> > >>> > > > >> > > > > > > [2019-10-14 15:29:14,531] ERROR >>>> Scheduler >>>> > for >>>> > >>> > >>> > > > >> > MirrorSourceConnector >>>> > >>> > >>> > > > >> > > > > > caught >>>> > >>> > >>> > > > >> > > > > > > exception in scheduled task: syncing >>>> topic >>>> > >>> ACLs >>>> > >>> > >>> > > > >> > > > > > > >>>> > >>> (org.apache.kafka.connect.mirror.Scheduler:102) >>>> > >>> > >>> > > > >> > > > > > > >>>> > >>> > >>> > > > >> > > > > > > >>>> java.util.concurrent.ExecutionException: >>>> > >>> > >>> > > > >> > > > > > > >>>> > >>> > >>> > org.apache.kafka.common.errors.SecurityDisabledException: >>>> > >>> > >>> > > No >>>> > >>> > >>> > > > >> > > > Authorizer >>>> > >>> > >>> > > > >> > > > > > is >>>> > >>> > >>> > > > >> > > > > > > configured on the broker >>>> > >>> > >>> > > > >> > > > > > > >>>> > >>> > >>> > > > >> > > > > > > at >>>> > >>> > >>> > > > >> > > > > > > >>>> > >>> > >>> > > > >> > > > > > >>>> > >>> > >>> > > > >> > > > > >>>> > >>> > >>> > > > >> > > > >>>> > >>> > >>> > > > >> > > >>>> > >>> > >>> > > > >> > >>>> > >>> > >>> > > > >> >>>> > >>> > >>> > > > >>>> > >>> > >>> > > >>>> > >>> > >>> > >>>> > >>> > >>> >>>> > >>> > >>>> > >>> >>>> > >>>> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) >>>> > >>> > >>> > > > >> > > > > > > >>>> > >>> > >>> > > > >> > > > > > > at >>>> > >>> > >>> > > > >> > > > > > > >>>> > >>> > >>> > > > >> > > > > > >>>> > >>> > >>> > > > >> > > > > >>>> > >>> > >>> > > > >> > > > >>>> > >>> > >>> > > > >> > > >>>> > >>> > >>> > > > >> > >>>> > >>> > >>> > > > >> >>>> > >>> > >>> > > > >>>> > >>> > >>> > > >>>> > >>> > >>> > >>>> > >>> > >>> >>>> > >>> > >>>> > >>> >>>> > >>>> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) >>>> > >>> > >>> > > > >> > > > > > > >>>> > >>> > >>> > > > >> > > > > > > at >>>> > >>> > >>> > > > >> > > > > > > >>>> > >>> > >>> > > > >> > > > > > >>>> > >>> > >>> > > > >> > > > > >>>> > >>> > >>> > > > >> > > > >>>> > >>> > >>> > > > >> > > >>>> > >>> > >>> > > > >> > >>>> > >>> > >>> > > > >> >>>> > >>> > >>> > > > >>>> > >>> > >>> > > >>>> > >>> > >>> > >>>> > >>> > >>> >>>> > >>> > >>>> > >>> >>>> > >>>> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) >>>> > >>> > >>> > > > >> > > > > > > >>>> > >>> > >>> > > > >> > > > > > > at >>>> > >>> > >>> > > > >> > > > > > > >>>> > >>> > >>> > > > >> > > > > > >>>> > >>> > >>> > > > >> > > > > >>>> > >>> > >>> > > > >> > > > >>>> > >>> > >>> > > > >> > > >>>> > >>> > >>> > > > >> > >>>> > >>> > >>> > > > >> >>>> > >>> > >>> > > > >>>> > >>> > >>> > > >>>> > >>> > >>> > >>>> > >>> > >>> >>>> > >>> > >>>> > >>> >>>> > >>>> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) >>>> > >>> > >>> > > > >> > > > > > > >>>> > >>> > >>> > > > >> > > > > > > at >>>> > >>> > >>> > > > >> > > > > > > >>>> > >>> > >>> > > > >> > > > > > >>>> > >>> > >>> > > > >> > > > > >>>> > >>> > >>> > > > >> > > > >>>> > >>> > >>> > > > >> > > >>>> > >>> > >>> > > > >> > >>>> > >>> > >>> > > > >> >>>> > >>> > >>> > > > >>>> > >>> > >>> > > >>>> > >>> > >>> > >>>> > >>> > >>> >>>> > >>> > >>>> > >>> >>>> > >>>> org.apache.kafka.connect.mirror.MirrorSourceConnector.listTopicAclBindings(MirrorSourceConnector.java:273) >>>> > >>> > >>> > > > >> > > > > > > >>>> > >>> > >>> > > > >> > > > > > > at >>>> > >>> > >>> > > > >> > > > > > > >>>> > >>> > >>> > > > >> > > > > > >>>> > >>> > >>> > > > >> > > > > >>>> > >>> > >>> > > > >> > > > >>>> > >>> > >>> > > > >> > > >>>> > >>> > >>> > > > >> > >>>> > >>> > >>> > > > >> >>>> > >>> > >>> > > > >>>> > >>> > >>> > > >>>> > >>> > >>> > >>>> > >>> > >>> >>>> > >>> > >>>> > >>> >>>> > >>>> org.apache.kafka.connect.mirror.MirrorSourceConnector.syncTopicAcls(MirrorSourceConnector.java:214) >>>> > >>> > >>> > > > >> > > > > > > >>>> > >>> > >>> > > > >> > > > > > > at >>>> > >>> > >>> > > > >> > > > > > > >>>> > >>> > >>> > > > >> >>>> > >>> > org.apache.kafka.connect.mirror.Scheduler.run(Scheduler.java:93) >>>> > >>> > >>> > > > >> > > > > > > >>>> > >>> > >>> > > > >> > > > > > > at >>>> > >>> > >>> > > > >> > > > > > > >>>> > >>> > >>> > > > >> > > > > > >>>> > >>> > >>> > > > >> > > > > >>>> > >>> > >>> > > > >> > > > >>>> > >>> > >>> > > > >> > > >>>> > >>> > >>> > > > >> > >>>> > >>> > >>> > > > >> >>>> > >>> > >>> > > > >>>> > >>> > >>> > > >>>> > >>> > >>> > >>>> > >>> > >>> >>>> > >>> > >>>> > >>> >>>> > >>>> org.apache.kafka.connect.mirror.Scheduler.executeThread(Scheduler.java:112) >>>> > >>> > >>> > > > >> > > > > > > >>>> > >>> > >>> > > > >> > > > > > > at >>>> > >>> > >>> > > > >> > > > > > > >>>> > >>> > >>> > > > >> > > > > > >>>> > >>> > >>> > > > >> > > > > >>>> > >>> > >>> > > > >> > > > >>>> > >>> > >>> > > > >> > > >>>> > >>> > >>> > > > >> > >>>> > >>> > >>> > > > >> >>>> > >>> > >>> > > > >>>> > >>> > >>> > > >>>> > >>> > >>> > >>>> > >>> > >>> >>>> > >>> > >>>> > >>> >>>> > >>>> org.apache.kafka.connect.mirror.Scheduler.lambda$scheduleRepeating$0(Scheduler.java:50) >>>> > >>> > >>> > > > >> > > > > > > >>>> > >>> > >>> > > > >> > > > > > > at >>>> > >>> > >>> > > > >> > > > > > > >>>> > >>> > >>> > > > >> > > > >>>> > >>> > >>> > > > >> >>>> > >>> > >>> > > >>>> > >>> > >>> >>>> > >>> >>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>>> > >>> > >>> > > > >> > > > > > > >>>> > >>> > >>> > > > >> > > > > > > at >>>> > >>> > >>> > > > >> > > > > > > >>>> > >>> > >>> > > > >> >>>> > >>> > java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) >>>> > >>> > >>> > > > >> > > > > > > >>>> > >>> > >>> > > > >> > > > > > > at >>>> > >>> > >>> > > > >> > > > > > > >>>> > >>> > >>> > > > >> > > > > > >>>> > >>> > >>> > > > >> > > > > >>>> > >>> > >>> > > > >> > > > >>>> > >>> > >>> > > > >> > > >>>> > >>> > >>> > > > >> > >>>> > >>> > >>> > > > >> >>>> > >>> > >>> > > > >>>> > >>> > >>> > > >>>> > >>> > >>> > >>>> > >>> > >>> >>>> > >>> > >>>> > >>> >>>> > >>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) >>>> > >>> > >>> > > > >> > > > > > > >>>> > >>> > >>> > > > >> > > > > > > at >>>> > >>> > >>> > > > >> > > > > > > >>>> > >>> > >>> > > > >> > > > > > >>>> > >>> > >>> > > > >> > > > > >>>> > >>> > >>> > > > >> > > > >>>> > >>> > >>> > > > >> > > >>>> > >>> > >>> > > > >> > >>>> > >>> > >>> > > > >> >>>> > >>> > >>> > > > >>>> > >>> > >>> > > >>>> > >>> > >>> > >>>> > >>> > >>> >>>> > >>> > >>>> > >>> >>>> > >>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) >>>> > >>> > >>> > > > >> > > > > > > >>>> > >>> > >>> > > > >> > > > > > > at >>>> > >>> > >>> > > > >> > > > > > > >>>> > >>> > >>> > > > >> > > > > > >>>> > >>> > >>> > > > >> > > > > >>>> > >>> > >>> > > > >> > > > >>>> > >>> > >>> > > > >> > > >>>> > >>> > >>> > > > >> > >>>> > >>> > >>> > > > >> >>>> > >>> > >>> > > > >>>> > >>> > >>> > > >>>> > >>> > >>> > >>>> > >>> > >>> >>>> > >>> > >>>> > >>> >>>> > >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>>> > >>> > >>> > > > >> > > > > > > >>>> > >>> > >>> > > > >> > > > > > > at >>>> > >>> > >>> > > > >> > > > > > > >>>> > >>> > >>> > > > >> > > > > > >>>> > >>> > >>> > > > >> > > > > >>>> > >>> > >>> > > > >> > > > >>>> > >>> > >>> > > > >> > > >>>> > >>> > >>> > > > >> > >>>> > >>> > >>> > > > >> >>>> > >>> > >>> > > > >>>> > >>> > >>> > > >>>> > >>> > >>> > >>>> > >>> > >>> >>>> > >>> > >>>> > >>> >>>> > >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>>> > >>> > >>> > > > >> > > > > > > >>>> > >>> > >>> > > > >> > > > > > > at >>>> > >>> java.lang.Thread.run(Thread.java:748) >>>> > >>> > >>> > > > >> > > > > > > >>>> > >>> > >>> > > > >> > > > > > > Caused by: >>>> > >>> > >>> > > > >> > > >>>> > >>> org.apache.kafka.common.errors.SecurityDisabledException: >>>> > >>> > >>> > > > >> > > > No >>>> > >>> > >>> > > > >> > > > > > > Authorizer is configured on the broker >>>> > >>> > >>> > > > >> > > > > > > >>>> > >>> > >>> > > > >> > > > > > > On Mon, Oct 14, 2019 at 12:30 PM Ryanne >>>> > Dolan >>>> > >>> < >>>> > >>> > >>> > > > >> > > ryannedo...@gmail.com >>>> > >>> > >>> > > > >> > > > > >>>> > >>> > >>> > > > >> > > > > > > wrote: >>>> > >>> > >>> > > > >> > > > > > > >>>> > >>> > >>> > > > >> > > > > > >> > I do not have a single record in the >>>> > >>> offsets >>>> > >>> > >>> topic >>>> > >>> > >>> > > > >> > > > > > >> >>>> > >>> > >>> > > > >> > > > > > >> That's definitely not normal. You are >>>> > correct >>>> > >>> > that >>>> > >>> > >>> > > without >>>> > >>> > >>> > > > >> > records >>>> > >>> > >>> > > > >> > > > in >>>> > >>> > >>> > > > >> > > > > > that >>>> > >>> > >>> > > > >> > > > > > >> topic, MM2 will restart from >>>> EARLIEST. The >>>> > >>> > offsets >>>> > >>> > >>> > should >>>> > >>> > >>> > > > be >>>> > >>> > >>> > > > >> > > stored >>>> > >>> > >>> > > > >> > > > > > >> periodically and whenever the >>>> connectors >>>> > >>> > gracefully >>>> > >>> > >>> > > > shutdown >>>> > >>> > >>> > > > >> or >>>> > >>> > >>> > > > >> > > > > restart. >>>> > >>> > >>> > > > >> > > > > > >> >>>> > >>> > >>> > > > >> > > > > > >> Is it possible the topics don't have >>>> > required >>>> > >>> > ACLs >>>> > >>> > >>> or >>>> > >>> > >>> > > > >> something? >>>> > >>> > >>> > > > >> > > > Also >>>> > >>> > >>> > > > >> > > > > > >> note: >>>> > >>> > >>> > > > >> > > > > > >> Connect wants the offsets topic to >>>> have a >>>> > >>> large >>>> > >>> > >>> number >>>> > >>> > >>> > of >>>> > >>> > >>> > > > >> > > partitions >>>> > >>> > >>> > > > >> > > > > and >>>> > >>> > >>> > > > >> > > > > > >> to >>>> > >>> > >>> > > > >> > > > > > >> be compacted. Though I can't imagine >>>> either >>>> > >>> would >>>> > >>> > >>> > prevent >>>> > >>> > >>> > > > >> > commits >>>> > >>> > >>> > > > >> > > > from >>>> > >>> > >>> > > > >> > > > > > >> being sent. >>>> > >>> > >>> > > > >> > > > > > >> >>>> > >>> > >>> > > > >> > > > > > >> Ryanne >>>> > >>> > >>> > > > >> > > > > > >> >>>> > >>> > >>> > > > >> > > > > > >> On Mon, Oct 14, 2019 at 10:46 AM >>>> Vishal >>>> > >>> Santoshi >>>> > >>> > < >>>> > >>> > >>> > > > >> > > > > > >> vishal.santo...@gmail.com> >>>> > >>> > >>> > > > >> > > > > > >> wrote: >>>> > >>> > >>> > > > >> > > > > > >> >>>> > >>> > >>> > > > >> > > > > > >> > 2nd/restore issue ( I think I need >>>> to >>>> > >>> solve >>>> > >>> > the >>>> > >>> > >>> > > offsets >>>> > >>> > >>> > > > >> topic >>>> > >>> > >>> > > > >> > > > issue >>>> > >>> > >>> > > > >> > > > > > >> > before I go with the scale up and >>>> down >>>> > >>> issue ) >>>> > >>> > >>> > > > >> > > > > > >> > >>>> > >>> > >>> > > > >> > > > > > >> > As you had indicated, I went ahead >>>> and >>>> > >>> created >>>> > >>> > >>> the >>>> > >>> > >>> > > > offsets >>>> > >>> > >>> > > > >> > > topic. >>>> > >>> > >>> > > > >> > > > > The >>>> > >>> > >>> > > > >> > > > > > >> > status of the cluster ( >>>> destination ) is >>>> > >>> thus >>>> > >>> > >>> > > > >> > > > > > >> > >>>> > >>> > >>> > > > >> > > > > > >> > opic# Partitions# BrokersBrokers >>>> Spread >>>> > >>> > %Brokers >>>> > >>> > >>> Skew >>>> > >>> > >>> > > > >> %Brokers >>>> > >>> > >>> > > > >> > > > > Leader >>>> > >>> > >>> > > > >> > > > > > >> > Skew %# ReplicasUnder Replicated >>>> %Leader >>>> > >>> > >>> SizeProducer >>>> > >>> > >>> > > > >> > > > > > Message/SecSummed >>>> > >>> > >>> > > > >> > > > > > >> > Recent Offsets >>>> > >>> > >>> > > > >> > > > > > >> > s8k.checkpoints.internal >>>> > >>> > >>> > > > >> > > > > > >> > < >>>> > >>> > >>> > > > >> > > > > > >> >>>> > >>> > >>> > > > >> > > > > > >>>> > >>> > >>> > > > >> > > > > >>>> > >>> > >>> > > > >> > > > >>>> > >>> > >>> > > > >> > > >>>> > >>> > >>> > > > >> > >>>> > >>> > >>> > > > >> >>>> > >>> > >>> > > > >>>> > >>> > >>> > > >>>> > >>> > >>> > >>>> > >>> > >>> >>>> > >>> > >>>> > >>> >>>> > >>>> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/s8k.checkpoints.internal >>>> > >>> > >>> > > > >> > > > > > >> > >>>> > >>> > >>> > > > >> > > > > > >> > 1 3 60 0 0 3 0 0.00 0 >>>> > >>> > >>> > > > >> > > > > > >> > s8k.act_search_page >>>> > >>> > >>> > > > >> > > > > > >> > < >>>> > >>> > >>> > > > >> > > > > > >> >>>> > >>> > >>> > > > >> > > > > > >>>> > >>> > >>> > > > >> > > > > >>>> > >>> > >>> > > > >> > > > >>>> > >>> > >>> > > > >> > > >>>> > >>> > >>> > > > >> > >>>> > >>> > >>> > > > >> >>>> > >>> > >>> > > > >>>> > >>> > >>> > > >>>> > >>> > >>> > >>>> > >>> > >>> >>>> > >>> > >>>> > >>> >>>> > >>>> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/s8k.act_search_page >>>> > >>> > >>> > > > >> > > > > > >> > >>>> > >>> > >>> > > > >> > > > > > >> > 1 3 60 0 0 3 0 6675.30 4,166,842 >>>> > >>> > >>> > > > >> > > > > > >> > s8k.act_reach >>>> > >>> > >>> > > > >> > > > > > >> > < >>>> > >>> > >>> > > > >> > > > > > >> >>>> > >>> > >>> > > > >> > > > > > >>>> > >>> > >>> > > > >> > > > > >>>> > >>> > >>> > > > >> > > > >>>> > >>> > >>> > > > >> > > >>>> > >>> > >>> > > > >> > >>>> > >>> > >>> > > > >> >>>> > >>> > >>> > > > >>>> > >>> > >>> > > >>>> > >>> > >>> > >>>> > >>> > >>> >>>> > >>> > >>>> > >>> >>>> > >>>> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/s8k.act_reach >>>> > >>> > >>> > > > >> > > > > > >> > >>>> > >>> > >>> > > > >> > > > > > >> > 1 3 60 0 0 3 0 20657.92 11,579,529 >>>> > >>> > >>> > > > >> > > > > > >> > mm2-status.s8k.internal >>>> > >>> > >>> > > > >> > > > > > >> > < >>>> > >>> > >>> > > > >> > > > > > >> >>>> > >>> > >>> > > > >> > > > > > >>>> > >>> > >>> > > > >> > > > > >>>> > >>> > >>> > > > >> > > > >>>> > >>> > >>> > > > >> > > >>>> > >>> > >>> > > > >> > >>>> > >>> > >>> > > > >> >>>> > >>> > >>> > > > >>>> > >>> > >>> > > >>>> > >>> > >>> > >>>> > >>> > >>> >>>> > >>> > >>>> > >>> >>>> > >>>> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-status.s8k.internal >>>> > >>> > >>> > > > >> > > > > > >> > >>>> > >>> > >>> > > > >> > > > > > >> > 5 5 100 0 0 3 0 0.00 10 >>>> > >>> > >>> > > > >> > > > > > >> > mm2-offsets.s8k_test.internal >>>> > >>> > >>> > > > >> > > > > > >> > < >>>> > >>> > >>> > > > >> > > > > > >> >>>> > >>> > >>> > > > >> > > > > > >>>> > >>> > >>> > > > >> > > > > >>>> > >>> > >>> > > > >> > > > >>>> > >>> > >>> > > > >> > > >>>> > >>> > >>> > > > >> > >>>> > >>> > >>> > > > >> >>>> > >>> > >>> > > > >>>> > >>> > >>> > > >>>> > >>> > >>> > >>>> > >>> > >>> >>>> > >>> > >>>> > >>> >>>> > >>>> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-offsets.s8k_test.internal >>>> > >>> > >>> > > > >> > > > > > >> > >>>> > >>> > >>> > > > >> > > > > > >> > 1 3 60 0 0 3 0 0.00 0 >>>> > >>> > >>> > > > >> > > > > > >> > mm2-offsets.s8k.internal >>>> > >>> > >>> > > > >> > > > > > >> > < >>>> > >>> > >>> > > > >> > > > > > >> >>>> > >>> > >>> > > > >> > > > > > >>>> > >>> > >>> > > > >> > > > > >>>> > >>> > >>> > > > >> > > > >>>> > >>> > >>> > > > >> > > >>>> > >>> > >>> > > > >> > >>>> > >>> > >>> > > > >> >>>> > >>> > >>> > > > >>>> > >>> > >>> > > >>>> > >>> > >>> > >>>> > >>> > >>> >>>> > >>> > >>>> > >>> >>>> > >>>> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-offsets.s8k.internal >>>> > >>> > >>> > > > >> > > > > > >> > >>>> > >>> > >>> > > > >> > > > > > >> > 1 3 60 0 0 3 0 0.00 0 >>>> > >>> > >>> > > > >> > > > > > >> > mm2-configs.s8k.internal >>>> > >>> > >>> > > > >> > > > > > >> > < >>>> > >>> > >>> > > > >> > > > > > >> >>>> > >>> > >>> > > > >> > > > > > >>>> > >>> > >>> > > > >> > > > > >>>> > >>> > >>> > > > >> > > > >>>> > >>> > >>> > > > >> > > >>>> > >>> > >>> > > > >> > >>>> > >>> > >>> > > > >> >>>> > >>> > >>> > > > >>>> > >>> > >>> > > >>>> > >>> > >>> > >>>> > >>> > >>> >>>> > >>> > >>>> > >>> >>>> > >>>> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-configs.s8k.internal >>>> > >>> > >>> > > > >> > > > > > >> > >>>> > >>> > >>> > > > >> > > > > > >> > 1 3 60 0 0 3 0 0.00 13 >>>> > >>> > >>> > > > >> > > > > > >> > >>>> > >>> > >>> > > > >> > > > > > >> > You can see . that we have the 5 ( >>>> I >>>> > >>> created >>>> > >>> > >>> bot the >>>> > >>> > >>> > > > >> offsets, >>>> > >>> > >>> > > > >> > > to >>>> > >>> > >>> > > > >> > > > be >>>> > >>> > >>> > > > >> > > > > > >> safe >>>> > >>> > >>> > > > >> > > > > > >> > for the below ) >>>> > >>> > >>> > > > >> > > > > > >> > >>>> > >>> > >>> > > > >> > > > > > >> > *clusters = s8k, s8k_test* >>>> > >>> > >>> > > > >> > > > > > >> > >>>> > >>> > >>> > > > >> > > > > > >> > >>>> > >>> > >>> > > > >> > > > > > >> > >>>> > >>> > >>> > > > >> > > > > > >> > *s8k.bootstrap.servers = .....* >>>> > >>> > >>> > > > >> > > > > > >> > >>>> > >>> > >>> > > > >> > > > > > >> > *s8k_test.bootstrap.servers = >>>> ......* >>>> > >>> > >>> > > > >> > > > > > >> > >>>> > >>> > >>> > > > >> > > > > > >> > >>>> > >>> > >>> > > > >> > > > > > >> > *# only allow replication dr1 -> >>>> dr2* >>>> > >>> > >>> > > > >> > > > > > >> > >>>> > >>> > >>> > > > >> > > > > > >> > *s8k->s8k_test.enabled = true* >>>> > >>> > >>> > > > >> > > > > > >> > >>>> > >>> > >>> > > > >> > > > > > >> > *s8k->s8k_test.topics = >>>> > >>> > >>> act_search_page|act_reach* >>>> > >>> > >>> > > > >> > > > > > >> > >>>> > >>> > >>> > > > >> > > > > > >> > >>>> *s8k->s8k_test.emit.heartbeats.enabled = >>>> > >>> false* >>>> > >>> > >>> > > > >> > > > > > >> > >>>> > >>> > >>> > > > >> > > > > > >> > >>>> > >>> > >>> > > > >> > > > > > >> > >>>> > >>> > >>> > > > >> > > > > > >> > >>>> > >>> > >>> > > > >> > > > > > >> > *s8k_test->s8k.enabled = false* >>>> > >>> > >>> > > > >> > > > > > >> > >>>> > >>> > >>> > > > >> > > > > > >> > >>>> *s8k_test->s8k.emit.heartbeats.enabled = >>>> > >>> false* >>>> > >>> > >>> > > > >> > > > > > >> > >>>> > >>> > >>> > > > >> > > > > > >> > *s8k_test.replication.factor = 3* >>>> > >>> > >>> > > > >> > > > > > >> > >>>> > >>> > >>> > > > >> > > > > > >> > *s8k.replication.factor = 3* >>>> > >>> > >>> > > > >> > > > > > >> > >>>> > >>> > >>> > > > >> > > > > > >> > *offsets.storage.replication.factor >>>> = 3* >>>> > >>> > >>> > > > >> > > > > > >> > >>>> > >>> > >>> > > > >> > > > > > >> > *replication.factor = 3* >>>> > >>> > >>> > > > >> > > > > > >> > >>>> > >>> > >>> > > > >> > > > > > >> > *replication.policy.separator = .* >>>> > >>> > >>> > > > >> > > > > > >> > >>>> > >>> > >>> > > > >> > > > > > >> > *tasks.max = 4* >>>> > >>> > >>> > > > >> > > > > > >> > >>>> > >>> > >>> > > > >> > > > > > >> > >>>> > >>> > >>> > > > >> > > > > > >> > >>>> > >>> > >>> > > > >> > > > > > >> > >>>> > >>> > >>> > > > >> > > > > > >> > What seems strange is that I do not >>>> have >>>> > a >>>> > >>> > single >>>> > >>> > >>> > > record >>>> > >>> > >>> > > > in >>>> > >>> > >>> > > > >> > the >>>> > >>> > >>> > > > >> > > > > > offsets >>>> > >>> > >>> > > > >> > > > > > >> > topic.. Is that normal ? I would >>>> > imagine >>>> > >>> that >>>> > >>> > >>> > > without a >>>> > >>> > >>> > > > >> > > record, >>>> > >>> > >>> > > > >> > > > > > there >>>> > >>> > >>> > > > >> > > > > > >> is >>>> > >>> > >>> > > > >> > > > > > >> > no way that a restore would >>>> happen.... >>>> > And >>>> > >>> that >>>> > >>> > >>> is >>>> > >>> > >>> > > > obvious >>>> > >>> > >>> > > > >> > when >>>> > >>> > >>> > > > >> > > I >>>> > >>> > >>> > > > >> > > > > > >> restart >>>> > >>> > >>> > > > >> > > > > > >> > the mm2 instance... Find the >>>> screenshot >>>> > >>> > >>> attached. In >>>> > >>> > >>> > > > >> essence >>>> > >>> > >>> > > > >> > the >>>> > >>> > >>> > > > >> > > > > > latency >>>> > >>> > >>> > > > >> > > > > > >> > avg lag is reset \when the mm2 >>>> instance >>>> > is >>>> > >>> > reset >>>> > >>> > >>> > > > >> indicating no >>>> > >>> > >>> > > > >> > > > > restore >>>> > >>> > >>> > > > >> > > > > > >> but >>>> > >>> > >>> > > > >> > > > > > >> > restart from EARLIEST... I must be >>>> > missing >>>> > >>> some >>>> > >>> > >>> thing >>>> > >>> > >>> > > > >> simple >>>> > >>> > >>> > > > >> > > > > > >> > >>>> > >>> > >>> > > > >> > > > > > >> > >>>> > >>> > >>> > > > >> > > > > > >> > >>>> > >>> > >>> > > > >> > > > > > >> > >>>> > >>> > >>> > > > >> > > > > > >> > >>>> > >>> > >>> > > > >> > > > > > >> > >>>> > >>> > >>> > > > >> > > > > > >> > >>>> > >>> > >>> > > > >> > > > > > >> > >>>> > >>> > >>> > > > >> > > > > > >> > >>>> > >>> > >>> > > > >> > > > > > >> > >>>> > >>> > >>> > > > >> > > > > > >> > >>>> > >>> > >>> > > > >> > > > > > >> > >>>> > >>> > >>> > > > >> > > > > > >> > >>>> > >>> > >>> > > > >> > > > > > >> > >>>> > >>> > >>> > > > >> > > > > > >> > On Sun, Oct 13, 2019 at 7:41 PM >>>> Ryanne >>>> > >>> Dolan < >>>> > >>> > >>> > > > >> > > > ryannedo...@gmail.com >>>> > >>> > >>> > > > >> > > > > > >>>> > >>> > >>> > > > >> > > > > > >> > wrote: >>>> > >>> > >>> > > > >> > > > > > >> > >>>> > >>> > >>> > > > >> > > > > > >> >> Vishal, the first issue is easy: >>>> you >>>> > must >>>> > >>> set >>>> > >>> > >>> > > tasks.max >>>> > >>> > >>> > > > to >>>> > >>> > >>> > > > >> > > > > something >>>> > >>> > >>> > > > >> > > > > > >> above >>>> > >>> > >>> > > > >> > > > > > >> >> 1 (the default) in order to >>>> achieve any >>>> > >>> > >>> parallelism. >>>> > >>> > >>> > > > This >>>> > >>> > >>> > > > >> > > > property >>>> > >>> > >>> > > > >> > > > > is >>>> > >>> > >>> > > > >> > > > > > >> >> passed along to the internal >>>> Connect >>>> > >>> workers. >>>> > >>> > >>> It's >>>> > >>> > >>> > > > >> > unfortunate >>>> > >>> > >>> > > > >> > > > that >>>> > >>> > >>> > > > >> > > > > > >> >> Connect >>>> > >>> > >>> > > > >> > > > > > >> >> is not smart enough to default this >>>> > >>> property >>>> > >>> > to >>>> > >>> > >>> the >>>> > >>> > >>> > > > >> number of >>>> > >>> > >>> > > > >> > > > > > workers. >>>> > >>> > >>> > > > >> > > > > > >> I >>>> > >>> > >>> > > > >> > > > > > >> >> suspect that will improve before >>>> long. >>>> > >>> > >>> > > > >> > > > > > >> >> >>>> > >>> > >>> > > > >> > > > > > >> >> For the second issue, is it >>>> possible you >>>> > >>> are >>>> > >>> > >>> missing >>>> > >>> > >>> > > the >>>> > >>> > >>> > > > >> > > offsets >>>> > >>> > >>> > > > >> > > > > > >> topic? It >>>> > >>> > >>> > > > >> > > > > > >> >> should exist alongside the config >>>> and >>>> > >>> status >>>> > >>> > >>> topics. >>>> > >>> > >>> > > > >> Connect >>>> > >>> > >>> > > > >> > > > should >>>> > >>> > >>> > > > >> > > > > > >> create >>>> > >>> > >>> > > > >> > > > > > >> >> this topic, but there are various >>>> > reasons >>>> > >>> this >>>> > >>> > >>> can >>>> > >>> > >>> > > fail, >>>> > >>> > >>> > > > >> e.g. >>>> > >>> > >>> > > > >> > > if >>>> > >>> > >>> > > > >> > > > > the >>>> > >>> > >>> > > > >> > > > > > >> >> replication factor is >>>> misconfigured. You >>>> > >>> can >>>> > >>> > try >>>> > >>> > >>> > > > creating >>>> > >>> > >>> > > > >> > this >>>> > >>> > >>> > > > >> > > > > topic >>>> > >>> > >>> > > > >> > > > > > >> >> manually or changing >>>> > >>> > >>> > > offsets.storage.replication.factor. >>>> > >>> > >>> > > > >> > > > > > >> >> >>>> > >>> > >>> > > > >> > > > > > >> >> Ryanne >>>> > >>> > >>> > > > >> > > > > > >> >> >>>> > >>> > >>> > > > >> > > > > > >> >> On Sun, Oct 13, 2019, 5:13 PM >>>> Vishal >>>> > >>> Santoshi >>>> > >>> > < >>>> > >>> > >>> > > > >> > > > > > >> vishal.santo...@gmail.com> >>>> > >>> > >>> > > > >> > > > > > >> >> wrote: >>>> > >>> > >>> > > > >> > > > > > >> >> >>>> > >>> > >>> > > > >> > > > > > >> >> > Using >>>> > >>> > >>> > > > >> > > > >>>> > >>> > https://github.com/apache/kafka/tree/trunk/connect/mirror >>>> > >>> > >>> > > > >> > > > > > as a >>>> > >>> > >>> > > > >> > > > > > >> >> > guide, >>>> > >>> > >>> > > > >> > > > > > >> >> > I have build from source the >>>> > >>> origin/KIP-382 >>>> > >>> > of >>>> > >>> > >>> > > > >> > > > > > >> >> > >>>> https://github.com/apache/kafka.git. >>>> > >>> > >>> > > > >> > > > > > >> >> > >>>> > >>> > >>> > > > >> > > > > > >> >> > I am seeing 2 issues >>>> > >>> > >>> > > > >> > > > > > >> >> > >>>> > >>> > >>> > > > >> > > > > > >> >> > * I brought up 2 processes on 2 >>>> > >>> different >>>> > >>> > >>> nodes ( >>>> > >>> > >>> > > they >>>> > >>> > >>> > > > >> are >>>> > >>> > >>> > > > >> > > > > actually >>>> > >>> > >>> > > > >> > > > > > >> >> pods on >>>> > >>> > >>> > > > >> > > > > > >> >> > k8s but that should not matter >>>> ). They >>>> > >>> share >>>> > >>> > >>> the >>>> > >>> > >>> > > > >> > > mm2.properties >>>> > >>> > >>> > > > >> > > > > > file >>>> > >>> > >>> > > > >> > > > > > >> and >>>> > >>> > >>> > > > >> > > > > > >> >> > are replicating ( 1-way ) 3 >>>> topics >>>> > with >>>> > >>> 8 >>>> > >>> > >>> > partitions >>>> > >>> > >>> > > > in >>>> > >>> > >>> > > > >> > > total. >>>> > >>> > >>> > > > >> > > > > > That >>>> > >>> > >>> > > > >> > > > > > >> >> seems >>>> > >>> > >>> > > > >> > > > > > >> >> > to be the way to create a >>>> standalone >>>> > mm2 >>>> > >>> > >>> cluster. >>>> > >>> > >>> > I >>>> > >>> > >>> > > do >>>> > >>> > >>> > > > >> not >>>> > >>> > >>> > > > >> > > > > however >>>> > >>> > >>> > > > >> > > > > > >> see( >>>> > >>> > >>> > > > >> > > > > > >> >> at >>>> > >>> > >>> > > > >> > > > > > >> >> > least the mbeans do not show ) >>>> any >>>> > >>> attempt >>>> > >>> > to >>>> > >>> > >>> > > > rebalance. >>>> > >>> > >>> > > > >> > > > > > >> >> > >>>> > >>> > >>> > > > >> > > > > > >> >> > >>>> > >>> > >>> > > > >> > > > > > >> >> >>>> > >>> > >>> > > > >> > > > > > >> >>>> > >>> > >>> > > > >> > > > > > >>>> > >>> > >>> > > > >> > > > > >>>> > >>> > >>> > > > >> > > > >>>> > >>> > >>> > > > >> > > >>>> > >>> > >>> > > > >> > >>>> > >>> > >>> > > > >> >>>> > >>> > >>> > > > >>>> > >>> > >>> > > >>>> > >>> > >>> > >>>> > >>> > >>> >>>> > >>> > >>>> > >>> >>>> > >>>> https://github.com/apache/kafka/tree/trunk/connect/mirror#monitoring-an-mm2-process >>>> > >>> > >>> > > > >> > > > > > >> >> > mbeans >>>> > >>> > >>> > > > >> > > > > > >> >> > are all on a single node >>>> > >>> > >>> > > > >> > > > > > >> >> > >>>> > >>> > >>> > > > >> > > > > > >> >> > >>>> > >>> > >>> > > > >> > > > > > >> >> > >>>> > >>> > >>> > > > >> > > > > > >> >> > * I restart the processes on the >>>> 2 >>>> > >>> nodes ( >>>> > >>> > >>> hard >>>> > >>> > >>> > stop >>>> > >>> > >>> > > > ans >>>> > >>> > >>> > > > >> > > start >>>> > >>> > >>> > > > >> > > > ). >>>> > >>> > >>> > > > >> > > > > > The >>>> > >>> > >>> > > > >> > > > > > >> >> > offsets for replication seem to >>>> be >>>> > >>> reset to >>>> > >>> > >>> the >>>> > >>> > >>> > > > >> earliest, >>>> > >>> > >>> > > > >> > as >>>> > >>> > >>> > > > >> > > if >>>> > >>> > >>> > > > >> > > > > it >>>> > >>> > >>> > > > >> > > > > > >> is a >>>> > >>> > >>> > > > >> > > > > > >> >> > brand new mirroring. It is also >>>> > obvious >>>> > >>> from >>>> > >>> > >>> the >>>> > >>> > >>> > > > >> > > > > > >> >> > >>>> > >>> > "record-age-ms-avg|replication-latency-ms-avg" >>>> > >>> > >>> > > > >> > > > > > >> >> > which I track through the >>>> restart. >>>> > >>> > >>> > > > >> > > > > > >> >> > >>>> > >>> > >>> > > > >> > > > > > >> >> > >>>> > >>> > >>> > > > >> > > > > > >> >> > >>>> > >>> > >>> > > > >> > > > > > >> >> > >>>> > >>> > >>> > > > >> > > > > > >> >> > This implies that >>>> > >>> > >>> > > > >> > > > > > >> >> > >>>> > >>> > >>> > > > >> > > > > > >> >> > >>>> > >>> > >>> > > > >> > > > > > >> >> > 1. Load balancing by rebalancing >>>> is >>>> > not >>>> > >>> > >>> working. I >>>> > >>> > >>> > > > >> cannot >>>> > >>> > >>> > > > >> > > scale >>>> > >>> > >>> > > > >> > > > > up >>>> > >>> > >>> > > > >> > > > > > or >>>> > >>> > >>> > > > >> > > > > > >> >> down >>>> > >>> > >>> > > > >> > > > > > >> >> > by adding nodes to the mm2 >>>> cluster or >>>> > >>> > removing >>>> > >>> > >>> > them. >>>> > >>> > >>> > > > >> > > > > > >> >> > >>>> > >>> > >>> > > > >> > > > > > >> >> > 2. Restore on a mirror is not >>>> working. >>>> > >>> If >>>> > >>> > the >>>> > >>> > >>> MM2 >>>> > >>> > >>> > > > >> cluster >>>> > >>> > >>> > > > >> > is >>>> > >>> > >>> > > > >> > > > > > brought >>>> > >>> > >>> > > > >> > > > > > >> >> down, >>>> > >>> > >>> > > > >> > > > > > >> >> > it does not start mirroring from >>>> the >>>> > >>> last >>>> > >>> > >>> known >>>> > >>> > >>> > > > state. I >>>> > >>> > >>> > > > >> > see >>>> > >>> > >>> > > > >> > > > the, >>>> > >>> > >>> > > > >> > > > > > >> >> > state/config topics etc created >>>> as >>>> > >>> > expected.. >>>> > >>> > >>> > > > >> > > > > > >> >> > >>>> > >>> > >>> > > > >> > > > > > >> >> > >>>> > >>> > >>> > > > >> > > > > > >> >> > >>>> > >>> > >>> > > > >> > > > > > >> >> > >>>> > >>> > >>> > > > >> > > > > > >> >> > >>>> > >>> > >>> > > > >> > > > > > >> >> > The mm2.properties is pretty >>>> mimimal >>>> > >>> > >>> > > > >> > > > > > >> >> > >>>> > >>> > >>> > > > >> > > > > > >> >> > >>>> > >>> > >>> > > > >> > > > > > >> >> > *clusters = a , b* >>>> > >>> > >>> > > > >> > > > > > >> >> > >>>> > >>> > >>> > > > >> > > > > > >> >> > >>>> > >>> > >>> > > > >> > > > > > >> >> > >>>> > >>> > >>> > > > >> > > > > > >> >> > *a.bootstrap.servers = k.....* >>>> > >>> > >>> > > > >> > > > > > >> >> > >>>> > >>> > >>> > > > >> > > > > > >> >> > *b.bootstrap.servers = k.....* >>>> > >>> > >>> > > > >> > > > > > >> >> > >>>> > >>> > >>> > > > >> > > > > > >> >> > >>>> > >>> > >>> > > > >> > > > > > >> >> > *# only allow replication dr1 -> >>>> dr2* >>>> > >>> > >>> > > > >> > > > > > >> >> > >>>> > >>> > >>> > > > >> > > > > > >> >> > *a->b.enabled = true* >>>> > >>> > >>> > > > >> > > > > > >> >> > >>>> > >>> > >>> > > > >> > > > > > >> >> > *a->b.topics = act_search_page* >>>> > >>> > >>> > > > >> > > > > > >> >> > >>>> > >>> > >>> > > > >> > > > > > >> >> > *a->b.emit.heartbeats.enabled = >>>> false* >>>> > >>> > >>> > > > >> > > > > > >> >> > >>>> > >>> > >>> > > > >> > > > > > >> >> > >>>> > >>> > >>> > > > >> > > > > > >> >> > *b->a..enabled = false* >>>> > >>> > >>> > > > >> > > > > > >> >> > >>>> > >>> > >>> > > > >> > > > > > >> >> > *b->a.emit.heartbeats.enabled = >>>> false* >>>> > >>> > >>> > > > >> > > > > > >> >> > >>>> > >>> > >>> > > > >> > > > > > >> >> > >>>> > >>> > >>> > > > >> > > > > > >> >> > >>>> > >>> > >>> > > > >> > > > > > >> >> > >>>> > >>> > >>> > > > >> > > > > > >> >> > >>>> > >>> > >>> > > > >> > > > > > >> >> > >>>> > >>> > >>> > > > >> > > > > > >> >> > What do you think is the issue ? >>>> > >>> > >>> > > > >> > > > > > >> >> > >>>> > >>> > >>> > > > >> > > > > > >> >> > >>>> > >>> > >>> > > > >> > > > > > >> >> > Thanks >>>> > >>> > >>> > > > >> > > > > > >> >> > >>>> > >>> > >>> > > > >> > > > > > >> >> >>>> > >>> > >>> > > > >> > > > > > >> > >>>> > >>> > >>> > > > >> > > > > > >> >>>> > >>> > >>> > > > >> > > > > > > >>>> > >>> > >>> > > > >> > > > > > >>>> > >>> > >>> > > > >> > > > > >>>> > >>> > >>> > > > >> > > > >>>> > >>> > >>> > > > >> > > >>>> > >>> > >>> > > > >> > >>>> > >>> > >>> > > > >> >>>> > >>> > >>> > > > > >>>> > >>> > >>> > > > >>>> > >>> > >>> > > >>>> > >>> > >>> > >>>> > >>> > >>> >>>> > >>> > >> >>>> > >>> > >>>> > >>> >>>> > >> >>>> > >>>> >>>