Confluent replicator keeps restarting consumers
Hi everyone, I'm trying to migrate from a cluster to another hosted on confluent cloud, I'm using the trial version or confluent replicator and it seems that it keeps restarting its consumers. I know that the replicator isn't within kafka but I think that the error might just be related to any consumer not just the one used by the replicator. I've created a consumer.properties file like this: security.protocol=SASL_SSL sasl.mechanism=PLAIN bootstrap.servers=xxx.eu-west-1.aws.confluent.cloud:9092 retry.backoff.ms=500 offset.flush.timeout.ms=30 max.poll.interval.ms=30 max.poll.records=250 group.instance.id=replicator-0 sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="xxx" password="xxx"; and a producer.properties like this: security.protocol=SASL_SSL sasl.mechanism=PLAIN bootstrap.servers=xxx.eu-west-1.aws.confluent.cloud:9092 offset.flush.timeout.ms=5 buffer.memory=335544 max.poll.interval.ms=30 max.poll.records=250 sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="xxx" password="xxx"; (I've tried to tune the properties you see between bootstrap.servers and sals.jaas.config a bit but the behavior doesn't change) I run the command like this: confluent-platform/bin/replicator --cluster.id replicator --consumer.config consumer.properties --producer.config producer.properties --topic.regex '.*' and what I see is these sections of logs continuously: [2021-04-09 20:39:08,967] INFO ConsumerConfig values: allow.auto.create.topics = true auto.commit.interval.ms = 5000 auto.offset.reset = none bootstrap.servers = [xxx.eu-west-1.aws.confluent.cloud:9092] check.crcs = true client.dns.lookup = use_all_dns_ips client.id = replicator-0 client.rack = connections.max.idle.ms = 54 default.api.timeout.ms = 6 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = replicator group.instance.id = null heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true internal.throw.on.fetch.stable.offset.unsupported = false isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 30 max.poll.records = 250 metadata.max.age.ms = 30 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 3 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 3 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = [hidden] sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 6 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = PLAIN security.protocol = SASL_SSL security.providers = null send.buffer.bytes = 131072 session.timeout.ms = 1 socket.connection.setup.timeout.max.ms = 127000 socket.connection.setup.timeout.ms = 1 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.3] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.certificate.chain = null ssl.keystore.key = null ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.3 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.certificates = null ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer (org.apache.kafka.clients.consumer.ConsumerConfig:361) [2021-04-09 20:39:08,969] WARN The configuration 'offset.flush.timeout.ms' was supplied but isn't a known config. (org.apache.kafka.clients.con
Re: RE: Maintaining same offset while migrating from Confluent Replicator to Apache Mirror Maker 2.0
Hello Samruddhi, if you want to migrate from mm1 to mm2, that tool will not work, since it was made for confluent replicator to mm2 for experimental purposes. But the high-level idea should work, since some people solved the mirrormaker migration problem by using the method mentioned in the article. https://lists.apache.org/thread.html/r928922036031df0db11a873ac076dae071a57a7f638bcb5911d34580%40%3Cusers.kafka.apache.org%3E On 2021/04/05 09:24:28, Samruddhi Naik wrote: > Hello Ning, > I was following this email thread and read through your article. I tried > executing your script which I found here- > https://github.com/ning2008wisc/mm2-migration but to my dismay that did not > work. Me and my colleague would really like some help on this one. We still > see the messages which were replicated back on the same topic when we > transition from mm1 to mm2. Any help will be appreciated. Looking forward to > your response. > > Thanks, > Samruddhi > > > On 2020/12/08 06:28:50, wrote: > > Hi Ning,> > > > > > > > > > > > > > > > > It did not worked. Here are the logs from the replicator and mirror maker 2 > > respectively:> > > > > > > > > Replicator:> > > > > > > > > [2020-12-08 05:11:06,611] INFO [Consumer clientId=onprem-aws-replicator-0, > > groupId=onprem-aws-replicator] Seeking to offset 83 for partition > > ONPREM.AWS.REPLICA.TOPIC.P3R3-0 > > (org.apache.kafka.clients.consumer.KafkaConsumer:1545)> > > > > > > > > [2020-12-08 05:11:07,113] INFO [Consumer clientId=onprem-aws-replicator-0, > > groupId=onprem-aws-replicator] Seeking to offset 49 for partition > > ONPREM.AWS.REPLICA.TOPIC.P3R3-1 > > (org.apache.kafka.clients.consumer.KafkaConsumer:1545)> > > > > > > > > [2020-12-08 05:11:07,615] INFO [Consumer clientId=onprem-aws-replicator-0, > > groupId=onprem-aws-replicator] Seeking to offset 53 for partition > > ONPREM.AWS.REPLICA.TOPIC.P3R3-2 > > (org.apache.kafka.clients.consumer.KafkaConsumer:1545)> > > > > > > > > > > > > > > > > Mirror Maker 2.0:> > > > > > > > > [2020-12-08 06:10:51,385] INFO [Consumer clientId=consumer-4, groupId=null] > > Seeking to offset 80 for partition ONPREM.AWS.REPLICA.TOPIC.P3R3-0 > > (org.apache.kafka.clients.consumer.KafkaConsumer:1545)> > > > > > > > > [2020-12-08 06:10:51.385] INFO [Consumer clientId= consumer-4, > > groupId=null] Seeking to offset 49 for partition > > ONPREM.AWS.REPLICA.TOPIC.P3R3-0 > > (org.apache.kafka.clients.consumer.KafkaConsumer:1545)> > > > > > > > > [2020-12-08 06:10:51.386] INFO [Consumer clientId= consumer-4, > > groupId=null] Seeking to offset 52 for partition > > ONPREM.AWS.REPLICA.TOPIC.P3R3-0 > > (org.apache.kafka.clients.consumer.KafkaConsumer:1545)> > > > > > > > > > > > > > > > > You can see that groupId is still null in MM2 and the offsets are previous > > offset meaning it will replicate those messages as well which has been > > already replicated by Replicator.> > > > > > > > > > > > > > > > > Thanks and regards,> > > > > > > > > Amit> > > > > > > > > > > > > > > > > -Original Message-> > > > > From: Ning Zhang > > > > > Sent: Monday, December 7, 2020 10:29 PM> > > > > To: users@kafka.apache.org> > > > > Subject: Re: RE: Maintaining same offset while migrating from Confluent > > Replicator to Apache Mirror Maker 2.0> > > > > > > > > > > > > > > > > [External]> > > > > > > > > > > > > > > > > > > > > > > > > Hi Amit,> > > > > > > > > > > > > > > > > After looking into a little bit, do you mind to override > > `connector.consumer.group`? The default consumer group may be called > > 'connect-MirrorSourceConnector' or similar> > > > > > > > > > > > > > > > > On 2020/12/07 03:32:30, > wrote:> > > > > > > > > > Hi Ning,> > > > > > > > > >> > > > > > > > > > Thank you for the response.> > > > > > >
RE: Maintaining same offset while migrating from Confluent Replicator to Apache Mirror Maker 2.0
Hello Ning, I was following this email thread and read through your article. I tried executing your script which I found here- https://github.com/ning2008wisc/mm2-migration but to my dismay that did not work. Me and my colleague would really like some help on this one. We still see the messages which were replicated back on the same topic when we transition from mm1 to mm2. Any help will be appreciated. Looking forward to your response. Thanks, Samruddhi On 2020/12/08 06:28:50, wrote: > Hi Ning,> > > > > > > > > It did not worked. Here are the logs from the replicator and mirror maker 2 > respectively:> > > > > Replicator:> > > > > [2020-12-08 05:11:06,611] INFO [Consumer clientId=onprem-aws-replicator-0, > groupId=onprem-aws-replicator] Seeking to offset 83 for partition > ONPREM.AWS.REPLICA.TOPIC.P3R3-0 > (org.apache.kafka.clients.consumer.KafkaConsumer:1545)> > > > > [2020-12-08 05:11:07,113] INFO [Consumer clientId=onprem-aws-replicator-0, > groupId=onprem-aws-replicator] Seeking to offset 49 for partition > ONPREM.AWS.REPLICA.TOPIC.P3R3-1 > (org.apache.kafka.clients.consumer.KafkaConsumer:1545)> > > > > [2020-12-08 05:11:07,615] INFO [Consumer clientId=onprem-aws-replicator-0, > groupId=onprem-aws-replicator] Seeking to offset 53 for partition > ONPREM.AWS.REPLICA.TOPIC.P3R3-2 > (org.apache.kafka.clients.consumer.KafkaConsumer:1545)> > > > > > > > > Mirror Maker 2.0:> > > > > [2020-12-08 06:10:51,385] INFO [Consumer clientId=consumer-4, groupId=null] > Seeking to offset 80 for partition ONPREM.AWS.REPLICA.TOPIC.P3R3-0 > (org.apache.kafka.clients.consumer.KafkaConsumer:1545)> > > > > [2020-12-08 06:10:51.385] INFO [Consumer clientId= consumer-4, groupId=null] > Seeking to offset 49 for partition ONPREM.AWS.REPLICA.TOPIC.P3R3-0 > (org.apache.kafka.clients.consumer.KafkaConsumer:1545)> > > > > [2020-12-08 06:10:51.386] INFO [Consumer clientId= consumer-4, groupId=null] > Seeking to offset 52 for partition ONPREM.AWS.REPLICA.TOPIC.P3R3-0 > (org.apache.kafka.clients.consumer.KafkaConsumer:1545)> > > > > > > > > You can see that groupId is still null in MM2 and the offsets are previous > offset meaning it will replicate those messages as well which has been > already replicated by Replicator.> > > > > > > > > Thanks and regards,> > > > > Amit> > > > > > > > > -Original Message-> > > From: Ning Zhang > > > Sent: Monday, December 7, 2020 10:29 PM> > > To: users@kafka.apache.org> > > Subject: Re: RE: Maintaining same offset while migrating from Confluent > Replicator to Apache Mirror Maker 2.0> > > > > > > > > [External]> > > > > > > > > > > > > Hi Amit,> > > > > > > > > After looking into a little bit, do you mind to override > `connector.consumer.group`? The default consumer group may be called > 'connect-MirrorSourceConnector' or similar> > > > > > > > > On 2020/12/07 03:32:30, > wrote:> > > > > > Hi Ning,> > > > > >> > > > > > Thank you for the response.> > > > > >> > > > > > I probably tried to change every possible consumer group id in all ways MM2 > > can run. But I went through the code and it seems consumer config is using > > assign method which does not requires group id to be assigned and also > > could not find anywhere that property being read and set.> > > > > >> > > > > > One crude way we found and is in testing is to manually change the offset > > of the topic in the internal topics which MM2 reads to get the latest > > offset of the message.> > > > > >> > > > > > Thanks,> > > > > > Amit> > > > > >> > > > > > -Original Message-> > > > > > From: Ning Zhang >> > > > > > Sent: Monday, December 7, 2020 3:46 AM> > > > > > To: users@kafka.apache.org> > > > > > Subject: Re: Maintaining same offset while migrating from Confluent > > Replicator to Apache Mirror Maker 2.0> > > > > >> > > > > > [External]> > > > > >> > > > > >> > > > > > Hi Amit, I guess you may need to ove
Re: RE: RE: RE: Maintaining same offset while migrating from Confluent Replicator to Apache Mirror Maker 2.0
Great - that sounds a smart way of bridging the Replicator and MM2. Seems like even though the consumer group can be same between Replicator and MM2, the storage format of offsets are still little different, so we need an "adapter" anyway. Just need to monitor the "adapter" is constantly running and not lagging behind too much :) happy to see if there may be more interesting stories along the migration On 2020/12/21 05:21:39, wrote: > Hi Ning and all, > > We got a crude way to solve this issue. Below are the high level steps: > > Read the message from Replicator's internal topic for storing offsets. > [connect-offsets] > This topic stores the offsets for all topics which is getting replicated in > key:value pair . For e.g. > Key : ["replicator-group",{"topic":"TEST","partition":0}] > Value: {"offset":24} > > For each topic and partition, whenever a new message is replicated a new > message with same key but increased offset is produced to the connect-offsets > topic. > Convert the key of this message to Mirror Maker 2 format and produce it in > the internal topic of MirrorMaker2. [You can change the internal topics in > the mirrormaker2-connect-distributed.properties file] The format for mirror > maker internal topic is: > Key: ["mirrormaker-group",{"cluster":"","partition":0,"topic":"TEST"}] > Value: {"offset":24} > > After posting the message, once the mirror maker is restarted, it will read > the internal topic to get the latest offset of that topic for which the > message has to be replicated and this way we can ensure no duplicate messages > are replicated. > > This has been tested and found to be working as expected. > > Thanks and regards, > Amit > > -Original Message- > From: Ning Zhang > Sent: Thursday, December 10, 2020 10:40 PM > To: users@kafka.apache.org > Subject: Re: RE: RE: Maintaining same offset while migrating from Confluent > Replicator to Apache Mirror Maker 2.0 > > [External] > > > regarding consumer group = null, > https://apc01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fkafka%2Fblob%2Ftrunk%2Fconnect%2Fmirror%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fkafka%2Fconnect%2Fmirror%2FMirrorSourceTask.java%23L89=04%7C01%7CAmit.SRIVASTAV%40cognizant.com%7Cb0848664644949d7bee308d89d2e63bf%7Cde08c40719b9427d9fe8edf254300ca7%7C0%7C0%7C637432169970427000%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=5D6J4NrLyYClJ3Pi7zNgU7FeQOCTp%2FmVB63iD0EzE6U%3D=0 > is where the consumer is created in MirrorSourceTask and people could > override any consumer-level config (including group.id) at > https://apc01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fkafka%2Fblob%2Fb44d32dffedb368b888e3431257d68abb1e62b9f%2Fconnect%2Fmirror%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fkafka%2Fconnect%2Fmirror%2FMirrorConnectorConfig.java%23L249=04%7C01%7CAmit.SRIVASTAV%40cognizant.co m%7Cb0848664644949d7bee308d89d2e63bf%7Cde08c40719b9427d9fe8edf254300ca7%7C0%7C0%7C637432169970436989%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=TnbS7CgaO%2BTr2KrfU8ACbomcVrao1j77flHMRufBir0%3D=0 > > as you may have tried something like `source.consumer.group.id`, > `.consumer.group.id`, or > `.group.id`, if all of them do not work, then we should > look into more source code and see if your setting is override by other places > > On 2020/12/08 06:28:50, wrote: > > Hi Ning, > > > > > > > > It did not worked. Here are the logs from the replicator and mirror maker 2 > > respectively: > > > > Replicator: > > > > [2020-12-08 05:11:06,611] INFO [Consumer clientId=onprem-aws-replicator-0, > > groupId=onprem-aws-replicator] Seeking to offset 83 for partition > > ONPREM.AWS.REPLICA.TOPIC.P3R3-0 > > (org.apache.kafka.clients.consumer.KafkaConsumer:1545) > > > > [2020-12-08 05:11:07,113] INFO [Consumer clientId=onprem-aws-replicator-0, > > groupId=onprem-aws-replicator] Seeking to offset 49 for partition > > ONPREM.AWS.REPLICA.TOPIC.P3R3-1 > > (org.apache.kafka.clients.consumer.KafkaConsumer:1545) > > > > [2020-12-08 05:11:07,615] INFO [Consumer clientId=onprem-aws-replicator-0, > > groupId=onprem-aws-replicator] Seeking to offset 53 for partition > > ONPREM.AWS.REPLICA.TOPIC.P3R3-2 > > (org.apache.kafka.clients.consumer.KafkaConsumer:1545) > > > > > > > > Mirror Maker 2.0: > > > > [2020-12-08 06:10:51,385] INFO [Consumer clientId=consumer-4, groupId=null] >
RE: RE: RE: Maintaining same offset while migrating from Confluent Replicator to Apache Mirror Maker 2.0
Hi Ning and all, We got a crude way to solve this issue. Below are the high level steps: Read the message from Replicator's internal topic for storing offsets. [connect-offsets] This topic stores the offsets for all topics which is getting replicated in key:value pair . For e.g. Key : ["replicator-group",{"topic":"TEST","partition":0}] Value: {"offset":24} For each topic and partition, whenever a new message is replicated a new message with same key but increased offset is produced to the connect-offsets topic. Convert the key of this message to Mirror Maker 2 format and produce it in the internal topic of MirrorMaker2. [You can change the internal topics in the mirrormaker2-connect-distributed.properties file] The format for mirror maker internal topic is: Key: ["mirrormaker-group",{"cluster":"","partition":0,"topic":"TEST"}] Value: {"offset":24} After posting the message, once the mirror maker is restarted, it will read the internal topic to get the latest offset of that topic for which the message has to be replicated and this way we can ensure no duplicate messages are replicated. This has been tested and found to be working as expected. Thanks and regards, Amit -Original Message- From: Ning Zhang Sent: Thursday, December 10, 2020 10:40 PM To: users@kafka.apache.org Subject: Re: RE: RE: Maintaining same offset while migrating from Confluent Replicator to Apache Mirror Maker 2.0 [External] regarding consumer group = null, https://apc01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fkafka%2Fblob%2Ftrunk%2Fconnect%2Fmirror%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fkafka%2Fconnect%2Fmirror%2FMirrorSourceTask.java%23L89data=04%7C01%7CAmit.SRIVASTAV%40cognizant.com%7Cb0848664644949d7bee308d89d2e63bf%7Cde08c40719b9427d9fe8edf254300ca7%7C0%7C0%7C637432169970427000%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000sdata=5D6J4NrLyYClJ3Pi7zNgU7FeQOCTp%2FmVB63iD0EzE6U%3Dreserved=0 is where the consumer is created in MirrorSourceTask and people could override any consumer-level config (including group.id) at https://apc01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fkafka%2Fblob%2Fb44d32dffedb368b888e3431257d68abb1e62b9f%2Fconnect%2Fmirror%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fkafka%2Fconnect%2Fmirror%2FMirrorConnectorConfig.java%23L249data=04%7C01%7CAmit.SRIVASTAV%40cognizant.com%7Cb0848664644949d7bee308d89d2e63bf%7Cde08c40719b9427d9fe8edf254300ca7%7C0%7C0%7C637432169970436989%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000sdata=TnbS7CgaO%2BTr2KrfU8ACbomcVrao1j77flHMRufBir0%3Dreserved=0 as you may have tried something like `source.consumer.group.id`, `.consumer.group.id`, or `.group.id`, if all of them do not work, then we should look into more source code and see if your setting is override by other places On 2020/12/08 06:28:50, wrote: > Hi Ning, > > > > It did not worked. Here are the logs from the replicator and mirror maker 2 > respectively: > > Replicator: > > [2020-12-08 05:11:06,611] INFO [Consumer clientId=onprem-aws-replicator-0, > groupId=onprem-aws-replicator] Seeking to offset 83 for partition > ONPREM.AWS.REPLICA.TOPIC.P3R3-0 > (org.apache.kafka.clients.consumer.KafkaConsumer:1545) > > [2020-12-08 05:11:07,113] INFO [Consumer clientId=onprem-aws-replicator-0, > groupId=onprem-aws-replicator] Seeking to offset 49 for partition > ONPREM.AWS.REPLICA.TOPIC.P3R3-1 > (org.apache.kafka.clients.consumer.KafkaConsumer:1545) > > [2020-12-08 05:11:07,615] INFO [Consumer clientId=onprem-aws-replicator-0, > groupId=onprem-aws-replicator] Seeking to offset 53 for partition > ONPREM.AWS.REPLICA.TOPIC.P3R3-2 > (org.apache.kafka.clients.consumer.KafkaConsumer:1545) > > > > Mirror Maker 2.0: > > [2020-12-08 06:10:51,385] INFO [Consumer clientId=consumer-4, groupId=null] > Seeking to offset 80 for partition ONPREM.AWS.REPLICA.TOPIC.P3R3-0 > (org.apache.kafka.clients.consumer.KafkaConsumer:1545) > > [2020-12-08 06:10:51.385] INFO [Consumer clientId= consumer-4, groupId=null] > Seeking to offset 49 for partition ONPREM.AWS.REPLICA.TOPIC.P3R3-0 > (org.apache.kafka.clients.consumer.KafkaConsumer:1545) > > [2020-12-08 06:10:51.386] INFO [Consumer clientId= consumer-4, groupId=null] > Seeking to offset 52 for partition ONPREM.AWS.REPLICA.TOPIC.P3R3-0 > (org.apache.kafka.clients.consumer.KafkaConsumer:1545) > > > > You can see that groupId is still null in MM2 and the offsets are previous > offset meaning it will replicate those messages as well which has been > already replicated by Replicator. > > > > Thanks and regards, > > Amit > > > > -
Re: RE: RE: Maintaining same offset while migrating from Confluent Replicator to Apache Mirror Maker 2.0
regarding consumer group = null, https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java#L89 is where the consumer is created in MirrorSourceTask and people could override any consumer-level config (including group.id) at https://github.com/apache/kafka/blob/b44d32dffedb368b888e3431257d68abb1e62b9f/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java#L249 as you may have tried something like `source.consumer.group.id`, `.consumer.group.id`, or `.group.id`, if all of them do not work, then we should look into more source code and see if your setting is override by other places On 2020/12/08 06:28:50, wrote: > Hi Ning, > > > > It did not worked. Here are the logs from the replicator and mirror maker 2 > respectively: > > Replicator: > > [2020-12-08 05:11:06,611] INFO [Consumer clientId=onprem-aws-replicator-0, > groupId=onprem-aws-replicator] Seeking to offset 83 for partition > ONPREM.AWS.REPLICA.TOPIC.P3R3-0 > (org.apache.kafka.clients.consumer.KafkaConsumer:1545) > > [2020-12-08 05:11:07,113] INFO [Consumer clientId=onprem-aws-replicator-0, > groupId=onprem-aws-replicator] Seeking to offset 49 for partition > ONPREM.AWS.REPLICA.TOPIC.P3R3-1 > (org.apache.kafka.clients.consumer.KafkaConsumer:1545) > > [2020-12-08 05:11:07,615] INFO [Consumer clientId=onprem-aws-replicator-0, > groupId=onprem-aws-replicator] Seeking to offset 53 for partition > ONPREM.AWS.REPLICA.TOPIC.P3R3-2 > (org.apache.kafka.clients.consumer.KafkaConsumer:1545) > > > > Mirror Maker 2.0: > > [2020-12-08 06:10:51,385] INFO [Consumer clientId=consumer-4, groupId=null] > Seeking to offset 80 for partition ONPREM.AWS.REPLICA.TOPIC.P3R3-0 > (org.apache.kafka.clients.consumer.KafkaConsumer:1545) > > [2020-12-08 06:10:51.385] INFO [Consumer clientId= consumer-4, groupId=null] > Seeking to offset 49 for partition ONPREM.AWS.REPLICA.TOPIC.P3R3-0 > (org.apache.kafka.clients.consumer.KafkaConsumer:1545) > > [2020-12-08 06:10:51.386] INFO [Consumer clientId= consumer-4, groupId=null] > Seeking to offset 52 for partition ONPREM.AWS.REPLICA.TOPIC.P3R3-0 > (org.apache.kafka.clients.consumer.KafkaConsumer:1545) > > > > You can see that groupId is still null in MM2 and the offsets are previous > offset meaning it will replicate those messages as well which has been > already replicated by Replicator. > > > > Thanks and regards, > > Amit > > > > -Original Message- > From: Ning Zhang > Sent: Monday, December 7, 2020 10:29 PM > To: users@kafka.apache.org > Subject: Re: RE: Maintaining same offset while migrating from Confluent > Replicator to Apache Mirror Maker 2.0 > > > > [External] > > > > > > Hi Amit, > > > > After looking into a little bit, do you mind to override > `connector.consumer.group`? The default consumer group may be called > 'connect-MirrorSourceConnector' or similar > > > > On 2020/12/07 03:32:30, > mailto:amit.srivas...@cognizant.com>> wrote: > > > Hi Ning, > > > > > > Thank you for the response. > > > > > > I probably tried to change every possible consumer group id in all ways MM2 > > can run. But I went through the code and it seems consumer config is using > > assign method which does not requires group id to be assigned and also > > could not find anywhere that property being read and set. > > > > > > One crude way we found and is in testing is to manually change the offset > > of the topic in the internal topics which MM2 reads to get the latest > > offset of the message. > > > > > > Thanks, > > > Amit > > > > > > -Original Message- > > > From: Ning Zhang mailto:ning2008w...@gmail.com>> > > > Sent: Monday, December 7, 2020 3:46 AM > > > To: users@kafka.apache.org<mailto:users@kafka.apache.org> > > > Subject: Re: Maintaining same offset while migrating from Confluent > > Replicator to Apache Mirror Maker 2.0 > > > > > > [External] > > > > > > > > > Hi Amit, I guess you may need to override the actual consumer group config > > (probably not consumer.group.id) that is used in Kafka Connect > > > > > > On 2020/11/26 06:47:11, > > mailto:amit.srivas...@cognizant.com>> wrote: > > > > Hi All, > > > > > > > > We are currently trying to migrate Confluent replicator to Apache Open > > > Source Mirror Maker v2.0. We are facing an issue where the messages which &
RE: RE: Maintaining same offset while migrating from Confluent Replicator to Apache Mirror Maker 2.0
Hi Ning, It did not worked. Here are the logs from the replicator and mirror maker 2 respectively: Replicator: [2020-12-08 05:11:06,611] INFO [Consumer clientId=onprem-aws-replicator-0, groupId=onprem-aws-replicator] Seeking to offset 83 for partition ONPREM.AWS.REPLICA.TOPIC.P3R3-0 (org.apache.kafka.clients.consumer.KafkaConsumer:1545) [2020-12-08 05:11:07,113] INFO [Consumer clientId=onprem-aws-replicator-0, groupId=onprem-aws-replicator] Seeking to offset 49 for partition ONPREM.AWS.REPLICA.TOPIC.P3R3-1 (org.apache.kafka.clients.consumer.KafkaConsumer:1545) [2020-12-08 05:11:07,615] INFO [Consumer clientId=onprem-aws-replicator-0, groupId=onprem-aws-replicator] Seeking to offset 53 for partition ONPREM.AWS.REPLICA.TOPIC.P3R3-2 (org.apache.kafka.clients.consumer.KafkaConsumer:1545) Mirror Maker 2.0: [2020-12-08 06:10:51,385] INFO [Consumer clientId=consumer-4, groupId=null] Seeking to offset 80 for partition ONPREM.AWS.REPLICA.TOPIC.P3R3-0 (org.apache.kafka.clients.consumer.KafkaConsumer:1545) [2020-12-08 06:10:51.385] INFO [Consumer clientId= consumer-4, groupId=null] Seeking to offset 49 for partition ONPREM.AWS.REPLICA.TOPIC.P3R3-0 (org.apache.kafka.clients.consumer.KafkaConsumer:1545) [2020-12-08 06:10:51.386] INFO [Consumer clientId= consumer-4, groupId=null] Seeking to offset 52 for partition ONPREM.AWS.REPLICA.TOPIC.P3R3-0 (org.apache.kafka.clients.consumer.KafkaConsumer:1545) You can see that groupId is still null in MM2 and the offsets are previous offset meaning it will replicate those messages as well which has been already replicated by Replicator. Thanks and regards, Amit -Original Message- From: Ning Zhang Sent: Monday, December 7, 2020 10:29 PM To: users@kafka.apache.org Subject: Re: RE: Maintaining same offset while migrating from Confluent Replicator to Apache Mirror Maker 2.0 [External] Hi Amit, After looking into a little bit, do you mind to override `connector.consumer.group`? The default consumer group may be called 'connect-MirrorSourceConnector' or similar On 2020/12/07 03:32:30, mailto:amit.srivas...@cognizant.com>> wrote: > Hi Ning, > > Thank you for the response. > > I probably tried to change every possible consumer group id in all ways MM2 > can run. But I went through the code and it seems consumer config is using > assign method which does not requires group id to be assigned and also could > not find anywhere that property being read and set. > > One crude way we found and is in testing is to manually change the offset of > the topic in the internal topics which MM2 reads to get the latest offset of > the message. > > Thanks, > Amit > > -Original Message- > From: Ning Zhang mailto:ning2008w...@gmail.com>> > Sent: Monday, December 7, 2020 3:46 AM > To: users@kafka.apache.org<mailto:users@kafka.apache.org> > Subject: Re: Maintaining same offset while migrating from Confluent > Replicator to Apache Mirror Maker 2.0 > > [External] > > > Hi Amit, I guess you may need to override the actual consumer group config > (probably not consumer.group.id) that is used in Kafka Connect > > On 2020/11/26 06:47:11, > mailto:amit.srivas...@cognizant.com>> wrote: > > Hi All, > > > > We are currently trying to migrate Confluent replicator to Apache Open > > Source Mirror Maker v2.0. We are facing an issue where the messages which > > are already replicated by replicator is getting replicated again when the > > mirror maker is started on the same topic. This should not happen as > > messages are getting duplicated at the target cluster. Here are more > > details: > > > > > > 1. RCA: replicator assign a consumer group for replicating messages. > > This consumer group maintains the offset of the source topic. But we are > > not able to assign same consumer group to the Consumer config in mirror > > maker 2. > > > > 2. Mirror Maker 1.0 : working as same consumer group can be assigned > > in consumer.properties file and the messages are picked right after where > > replicator was stopped. > > > > 3. Tried running and configuring source.cluster.consumer.group.id in > > mirror maker 2.0 in all available options (in cluster mode, in > > connect-standalone and connect-distributed mode) but mirror maker 2.0 is > > assigning consumer group id as null while replicating messages. > > > > > > Any pointers if anyone has done same and tried to maintain the same offset > > with mirror maker 2.0. > > > > Thanks and regards, > > Amit > > This e-mail and any files transmitted with it are for the sole use of the > > intended recipient(s) and may cont
Re: RE: Maintaining same offset while migrating from Confluent Replicator to Apache Mirror Maker 2.0
Hi Amit, After looking into a little bit, do you mind to override `connector.consumer.group`? The default consumer group may be called 'connect-MirrorSourceConnector' or similar On 2020/12/07 03:32:30, wrote: > Hi Ning, > > Thank you for the response. > > I probably tried to change every possible consumer group id in all ways MM2 > can run. But I went through the code and it seems consumer config is using > assign method which does not requires group id to be assigned and also could > not find anywhere that property being read and set. > > One crude way we found and is in testing is to manually change the offset of > the topic in the internal topics which MM2 reads to get the latest offset of > the message. > > Thanks, > Amit > > -Original Message- > From: Ning Zhang > Sent: Monday, December 7, 2020 3:46 AM > To: users@kafka.apache.org > Subject: Re: Maintaining same offset while migrating from Confluent > Replicator to Apache Mirror Maker 2.0 > > [External] > > > Hi Amit, I guess you may need to override the actual consumer group config > (probably not consumer.group.id) that is used in Kafka Connect > > On 2020/11/26 06:47:11, wrote: > > Hi All, > > > > We are currently trying to migrate Confluent replicator to Apache Open > > Source Mirror Maker v2.0. We are facing an issue where the messages which > > are already replicated by replicator is getting replicated again when the > > mirror maker is started on the same topic. This should not happen as > > messages are getting duplicated at the target cluster. Here are more > > details: > > > > > > 1. RCA: replicator assign a consumer group for replicating messages. > > This consumer group maintains the offset of the source topic. But we are > > not able to assign same consumer group to the Consumer config in mirror > > maker 2. > > > > 2. Mirror Maker 1.0 : working as same consumer group can be assigned > > in consumer.properties file and the messages are picked right after where > > replicator was stopped. > > > > 3. Tried running and configuring source.cluster.consumer.group.id in > > mirror maker 2.0 in all available options (in cluster mode, in > > connect-standalone and connect-distributed mode) but mirror maker 2.0 is > > assigning consumer group id as null while replicating messages. > > > > > > Any pointers if anyone has done same and tried to maintain the same offset > > with mirror maker 2.0. > > > > Thanks and regards, > > Amit > > This e-mail and any files transmitted with it are for the sole use of the > > intended recipient(s) and may contain confidential and privileged > > information. If you are not the intended recipient(s), please reply to the > > sender and destroy all copies of the original message. Any unauthorized > > review, use, disclosure, dissemination, forwarding, printing or copying of > > this email, and/or any action taken in reliance on the contents of this > > e-mail is strictly prohibited and may be unlawful. Where permitted by > > applicable law, this e-mail and other e-mail communications sent to and > > from Cognizant e-mail addresses may be monitored. > > > This e-mail and any files transmitted with it are for the sole use of the > intended recipient(s) and may contain confidential and privileged > information. If you are not the intended recipient(s), please reply to the > sender and destroy all copies of the original message. Any unauthorized > review, use, disclosure, dissemination, forwarding, printing or copying of > this email, and/or any action taken in reliance on the contents of this > e-mail is strictly prohibited and may be unlawful. Where permitted by > applicable law, this e-mail and other e-mail communications sent to and from > Cognizant e-mail addresses may be monitored. >
RE: Maintaining same offset while migrating from Confluent Replicator to Apache Mirror Maker 2.0
Hi Ning, Thank you for the response. I probably tried to change every possible consumer group id in all ways MM2 can run. But I went through the code and it seems consumer config is using assign method which does not requires group id to be assigned and also could not find anywhere that property being read and set. One crude way we found and is in testing is to manually change the offset of the topic in the internal topics which MM2 reads to get the latest offset of the message. Thanks, Amit -Original Message- From: Ning Zhang Sent: Monday, December 7, 2020 3:46 AM To: users@kafka.apache.org Subject: Re: Maintaining same offset while migrating from Confluent Replicator to Apache Mirror Maker 2.0 [External] Hi Amit, I guess you may need to override the actual consumer group config (probably not consumer.group.id) that is used in Kafka Connect On 2020/11/26 06:47:11, wrote: > Hi All, > > We are currently trying to migrate Confluent replicator to Apache Open Source > Mirror Maker v2.0. We are facing an issue where the messages which are > already replicated by replicator is getting replicated again when the mirror > maker is started on the same topic. This should not happen as messages are > getting duplicated at the target cluster. Here are more details: > > > 1. RCA: replicator assign a consumer group for replicating messages. > This consumer group maintains the offset of the source topic. But we are not > able to assign same consumer group to the Consumer config in mirror maker 2. > > 2. Mirror Maker 1.0 : working as same consumer group can be assigned in > consumer.properties file and the messages are picked right after where > replicator was stopped. > > 3. Tried running and configuring source.cluster.consumer.group.id in > mirror maker 2.0 in all available options (in cluster mode, in > connect-standalone and connect-distributed mode) but mirror maker 2.0 is > assigning consumer group id as null while replicating messages. > > > Any pointers if anyone has done same and tried to maintain the same offset > with mirror maker 2.0. > > Thanks and regards, > Amit > This e-mail and any files transmitted with it are for the sole use of the > intended recipient(s) and may contain confidential and privileged > information. If you are not the intended recipient(s), please reply to the > sender and destroy all copies of the original message. Any unauthorized > review, use, disclosure, dissemination, forwarding, printing or copying of > this email, and/or any action taken in reliance on the contents of this > e-mail is strictly prohibited and may be unlawful. Where permitted by > applicable law, this e-mail and other e-mail communications sent to and from > Cognizant e-mail addresses may be monitored. > This e-mail and any files transmitted with it are for the sole use of the intended recipient(s) and may contain confidential and privileged information. If you are not the intended recipient(s), please reply to the sender and destroy all copies of the original message. Any unauthorized review, use, disclosure, dissemination, forwarding, printing or copying of this email, and/or any action taken in reliance on the contents of this e-mail is strictly prohibited and may be unlawful. Where permitted by applicable law, this e-mail and other e-mail communications sent to and from Cognizant e-mail addresses may be monitored.
Re: Maintaining same offset while migrating from Confluent Replicator to Apache Mirror Maker 2.0
Hi Amit, I guess you may need to override the actual consumer group config (probably not consumer.group.id) that is used in Kafka Connect On 2020/11/26 06:47:11, wrote: > Hi All, > > We are currently trying to migrate Confluent replicator to Apache Open Source > Mirror Maker v2.0. We are facing an issue where the messages which are > already replicated by replicator is getting replicated again when the mirror > maker is started on the same topic. This should not happen as messages are > getting duplicated at the target cluster. Here are more details: > > > 1. RCA: replicator assign a consumer group for replicating messages. > This consumer group maintains the offset of the source topic. But we are not > able to assign same consumer group to the Consumer config in mirror maker 2. > > 2. Mirror Maker 1.0 : working as same consumer group can be assigned in > consumer.properties file and the messages are picked right after where > replicator was stopped. > > 3. Tried running and configuring source.cluster.consumer.group.id in > mirror maker 2.0 in all available options (in cluster mode, in > connect-standalone and connect-distributed mode) but mirror maker 2.0 is > assigning consumer group id as null while replicating messages. > > > Any pointers if anyone has done same and tried to maintain the same offset > with mirror maker 2.0. > > Thanks and regards, > Amit > This e-mail and any files transmitted with it are for the sole use of the > intended recipient(s) and may contain confidential and privileged > information. If you are not the intended recipient(s), please reply to the > sender and destroy all copies of the original message. Any unauthorized > review, use, disclosure, dissemination, forwarding, printing or copying of > this email, and/or any action taken in reliance on the contents of this > e-mail is strictly prohibited and may be unlawful. Where permitted by > applicable law, this e-mail and other e-mail communications sent to and from > Cognizant e-mail addresses may be monitored. >
Maintaining same offset while migrating from Confluent Replicator to Apache Mirror Maker 2.0
Hi All, We are currently trying to migrate Confluent replicator to Apache Open Source Mirror Maker v2.0. We are facing an issue where the messages which are already replicated by replicator is getting replicated again when the mirror maker is started on the same topic. This should not happen as messages are getting duplicated at the target cluster. Here are more details: 1. RCA: replicator assign a consumer group for replicating messages. This consumer group maintains the offset of the source topic. But we are not able to assign same consumer group to the Consumer config in mirror maker 2. 2. Mirror Maker 1.0 : working as same consumer group can be assigned in consumer.properties file and the messages are picked right after where replicator was stopped. 3. Tried running and configuring source.cluster.consumer.group.id in mirror maker 2.0 in all available options (in cluster mode, in connect-standalone and connect-distributed mode) but mirror maker 2.0 is assigning consumer group id as null while replicating messages. Any pointers if anyone has done same and tried to maintain the same offset with mirror maker 2.0. Thanks and regards, Amit This e-mail and any files transmitted with it are for the sole use of the intended recipient(s) and may contain confidential and privileged information. If you are not the intended recipient(s), please reply to the sender and destroy all copies of the original message. Any unauthorized review, use, disclosure, dissemination, forwarding, printing or copying of this email, and/or any action taken in reliance on the contents of this e-mail is strictly prohibited and may be unlawful. Where permitted by applicable law, this e-mail and other e-mail communications sent to and from Cognizant e-mail addresses may be monitored.
Re: Confluent Replicator
Not ideal... but, that way we can use a single combined, development/testing cluster for development, and 90% of the flows, scripts to deploy the environment and also test replication configuration etc, and building some experience how to manage it. G On Thu, Feb 20, 2020 at 6:32 AM Peter Bukowinski wrote: > That is possible as long and you include a topic.rename.format argument in > the replication.properties file. The origin and destination cluster configs > can point to the same cluster. > > See the example here > https://docs.confluent.io/current/multi-dc-deployments/replicator/replicator-quickstart.html#configure-and-run-replicator > > -- Peter > > > On Feb 19, 2020, at 7:41 PM, George wrote: > > > > Hi all. > > > > is it possible, for testing purposes to replicate topic A from Cluster 1 > to > > topic B on cluster 1/same cluster? > > > > G > > > > -- > > You have the obligation to inform one honestly of the risk, and as a > person > > you are committed to educate yourself to the total risk in any activity! > > > > Once informed & totally aware of the risk, > > every fool has the right to kill or injure themselves as they see fit! > -- You have the obligation to inform one honestly of the risk, and as a person you are committed to educate yourself to the total risk in any activity! Once informed & totally aware of the risk, every fool has the right to kill or injure themselves as they see fit!
Re: Confluent Replicator
That is possible as long and you include a topic.rename.format argument in the replication.properties file. The origin and destination cluster configs can point to the same cluster. See the example here https://docs.confluent.io/current/multi-dc-deployments/replicator/replicator-quickstart.html#configure-and-run-replicator -- Peter > On Feb 19, 2020, at 7:41 PM, George wrote: > > Hi all. > > is it possible, for testing purposes to replicate topic A from Cluster 1 to > topic B on cluster 1/same cluster? > > G > > -- > You have the obligation to inform one honestly of the risk, and as a person > you are committed to educate yourself to the total risk in any activity! > > Once informed & totally aware of the risk, > every fool has the right to kill or injure themselves as they see fit!
Confluent Replicator
Hi all. is it possible, for testing purposes to replicate topic A from Cluster 1 to topic B on cluster 1/same cluster? G -- You have the obligation to inform one honestly of the risk, and as a person you are committed to educate yourself to the total risk in any activity! Once informed & totally aware of the risk, every fool has the right to kill or injure themselves as they see fit!
Confluent Replicator
Does anybody have any experience with Confluent Replicator? Has it worked well for you? -Dave This e-mail and any files transmitted with it are confidential, may contain sensitive information, and are intended solely for the use of the individual or entity to whom they are addressed. If you have received this e-mail in error, please notify the sender by reply e-mail immediately and destroy all copies of the e-mail and any attachments.