[ 
https://issues.apache.org/jira/browse/KAFKA-16798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17848767#comment-17848767
 ] 

Thanos Athanasopoulos edited comment on KAFKA-16798 at 5/22/24 10:38 PM:
-------------------------------------------------------------------------

[~gharris1727] that was the culprit !

I set offset.lag.max=0 in mm2.properties and now it syncs asap.

 

Will head for some reading in the resources you shared and then some 
performance tuning in real world scenarios.

 

Thank you !


was (Author: JIRAUSER305481):
[~gharris1727] that was the culprit !

I set offset.lag.max=0 in mm2.properties and now it syncs asap.

Will go for some reading in the resources you shared and then some performance 
tuning in real world scenarios.

Thank you !

> Mirrormaker2 dedicated mode - sync.group.offsets.interval not working
> ---------------------------------------------------------------------
>
>                 Key: KAFKA-16798
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16798
>             Project: Kafka
>          Issue Type: Bug
>          Components: mirrormaker
>    Affects Versions: 3.7.0
>            Reporter: Thanos Athanasopoulos
>            Priority: Major
>
> Single instance MirrorMaker2 in dedicated mode, active passive replication 
> logic.
> sync.group.offsets.interval.seconds=2 configuration is enabled and active
> {noformat}
> [root@xxxxx-xxxxxxxxx ~]# docker logs cc-mm 2>&1 -f | grep -i 
> "auto.commit.interval\|checkpoint.interval\|consumer.commit.interval\|sync.topics.interval\|sync.group.offsets.interval\|offset-syncs.interval.seconds
>                                  "
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         sync.group.offsets.interval.seconds = 2
>         sync.group.offsets.interval.seconds = 2
>         auto.commit.interval.ms = 5000
>         sync.group.offsets.interval.seconds = 2
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         sync.group.offsets.interval.seconds = 2
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         sync.group.offsets.interval.seconds = 2
>         sync.group.offsets.interval.seconds = 2
>         auto.commit.interval.ms = 5000
> {noformat}
> but is not working, the commit of offsets happens *always every 60 seconds* 
> as you can see in the logs
> {noformat}
> [2024-05-20 09:32:44,847] INFO [MirrorSourceConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets for 20 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:32:44,852] INFO [MirrorSourceConnector|task-1|offsets] 
> WorkerSourceTask{id=MirrorSourceConnector-1} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:32:44,875] INFO [MirrorHeartbeatConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:32:44,881] INFO [MirrorCheckpointConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets for 1 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:32:44,890] INFO [MirrorHeartbeatConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:33:44,850] INFO [MirrorSourceConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets for 21 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:33:44,854] INFO [MirrorSourceConnector|task-1|offsets] 
> WorkerSourceTask{id=MirrorSourceConnector-1} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:33:44,878] INFO [MirrorHeartbeatConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:33:44,883] INFO [MirrorCheckpointConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets for 2 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:33:44,895] INFO [MirrorHeartbeatConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:34:44,852] INFO [MirrorSourceConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets for 20 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:34:44,857] INFO [MirrorSourceConnector|task-1|offsets] 
> WorkerSourceTask{id=MirrorSourceConnector-1} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:34:44,880] INFO [MirrorHeartbeatConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:34:44,886] INFO [MirrorCheckpointConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets for 1 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:34:44,897] INFO [MirrorHeartbeatConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:35:44,855] INFO [MirrorSourceConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets for 20 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:35:44,860] INFO [MirrorSourceConnector|task-1|offsets] 
> WorkerSourceTask{id=MirrorSourceConnector-1} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:35:44,883] INFO [MirrorHeartbeatConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:35:44,888] INFO [MirrorCheckpointConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets for 1 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:35:44,901] INFO [MirrorHeartbeatConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:36:44,860] INFO [MirrorSourceConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets for 20 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:36:44,863] INFO [MirrorSourceConnector|task-1|offsets] 
> WorkerSourceTask{id=MirrorSourceConnector-1} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:36:44,886] INFO [MirrorHeartbeatConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:36:44,893] INFO [MirrorCheckpointConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets for 1 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:36:44,904] INFO [MirrorHeartbeatConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:37:44,863] INFO [MirrorSourceConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets for 21 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:37:44,866] INFO [MirrorSourceConnector|task-1|offsets] 
> WorkerSourceTask{id=MirrorSourceConnector-1} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:37:44,888] INFO [MirrorHeartbeatConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:37:44,895] INFO [MirrorCheckpointConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets for 1 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:37:44,906] INFO [MirrorHeartbeatConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:38:44,866] INFO [MirrorSourceConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets for 20 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:38:44,869] INFO [MirrorSourceConnector|task-1|offsets] 
> WorkerSourceTask{id=MirrorSourceConnector-1} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:38:44,890] INFO [MirrorHeartbeatConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:38:44,898] INFO [MirrorCheckpointConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets for 2 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:38:44,910] INFO [MirrorHeartbeatConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:39:44,869] INFO [MirrorSourceConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets for 20 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:39:44,872] INFO [MirrorSourceConnector|task-1|offsets] 
> WorkerSourceTask{id=MirrorSourceConnector-1} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:39:44,892] INFO [MirrorHeartbeatConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:39:44,900] INFO [MirrorCheckpointConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets for 1 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:39:44,915] INFO [MirrorHeartbeatConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> {noformat}
>  
> MM2 Configuration:
>  
> {noformat}
> clusters = cl01,cl02
> cl01.bootstrap.servers = 
> cl01-kafka-01:9092,cl01-kafka-02:9092,cl01-kafka-03:9092
> cl02.bootstrap.servers = 
> cl02-kafka-01:9092,cl02-kafka-02:9092,cl02-kafka-03:9092
> ##############################
> ### First cluster config
> ##############################
> # source.cluster.alias = cl01
> # target.cluster.alias = cl02
> cl01->cl02.enabled=true
> cl01->cl02.enable.auto.commit=true
> cl01->cl02.sync.group.offsets.enabled=true
> cl01->cl02.refresh.groups.enabled=true
> cl01->cl02.refresh.topics.enabled=true
> cl01->cl02.sync.topics.enabled=true
> cl01->cl02.sync.topic.acls.enabled=false
> cl01->cl02.sync.acls.enabled=false
> cl01->cl02.emit.heartbeats.enabled=true
> cl01->cl02.emit.checkpoints.enabled=true
> cl01->cl02.sync.topic.configs.enabled=true
> cl01->cl02.emit.offset-syncs.enabled=true
> cl01.consumer.auto.offset.reset=earliest
> ##############################
> ### Second cluster (DR) config
> ##############################
> # source.cluster.alias = cl01
> # target.cluster.alias = cl02
> cl02->cl01.enabled=false
> cl02->cl01.enable.auto.commit=false
> cl02->cl01.emit.offset-syncs.enabled=false
> cl02->cl01.sync.group.offsets.enabled=false
> cl02->cl01.sync.topic.acls.enabled=false
> cl02->cl01.sync.acls.enabled=false
> cl02->cl01.refresh.groups.enabled=true
> cl02->cl01.refresh.topics.enabled=true
> cl02->cl01.sync.topics.enabled=true
> cl02->cl01.emit.heartbeats.enabled=true
> cl02->cl01.emit.checkpoints.enabled=true
> cl02->cl01.sync.topic.configs.enabled=true
> #cl02.consumer.auto.offset.reset=earliest
> ##############################
> ### CC Configs
> ##############################
> group.id=crosscluster-consumer-group
> client.id=crosscluster-clientgroups=.*
> #groups='.*cc-consumer-group.*'
> offset-syncs.topic.whitelist='cl.*\.test-topic'
> #topics='cl.*\.test-topic'
> topics.blacklist='[.*[\-\.]internal, .*\.replica, __.*]'
> key.converter = org.apache.kafka.connect.converters.ByteArrayConverter
> value.converter = 
> org.apache.kafka.connect.converters.ByteArrayConverteremit.heartbeats.interval.seconds=5
> emit.checkpoints.interval.seconds=5
> emit.offset-syncs.interval.seconds=2
> auto.commit.interval.seconds=5
> checkpoint.interval.seconds=5
> consumer.commit.interval.seconds=5
> offset.flush.interval.seconds=5
> sync.topics.interval.seconds=2
> sync.group.offsets.interval.seconds=2
> sync.acls.interval.seconds=5
> refresh.groups.interval.seconds=5
> refresh.topics.interval.seconds=5
> max.poll.interval.seconds=5
> heartbeats.topic.retention.hours=1
> checkpoints.topic.retention.hours=1
> offset.syncs.topic.retention.hours=1
> errors.log.enable=true
> errors.log.include.messages=true
> dedicated.mode.enable.internal.rest=true
> cluster.producer.enable.idempotence=true
> allow.auto.create.topics=truetasks.max=48
> replication.factor=1
> config.storage.replication.factor=1
> offset.storage.replication.factor=1
> status.storage.replication.factor=1
> offset-syncs.topic.replication.factor=3
> heartbeats.topic.replication.factor=1
> checkpoints.topic.replication.factor=1
> offsets.storage.replication.factor=1
> {noformat}
>  
> I see that {color:#cccccc}*offset.flush.interval.seconds* is always 60000 
> whatever change I make in the MM2 properties file and II suspect that this is 
> the culprit but needs some more investigation.{color}
> {noformat}
> docker logs cc-mm 2>&1 -f | grep -i "offset.flush.interval"
>         offset.flush.interval.ms = 60000
>         offset.flush.interval.ms = 60000
>         offset.flush.interval.ms = 60000
>         offset.flush.interval.ms = 60000
> {noformat}
> {color:#cccccc} 
> The docker-compose service spec{color}
> {noformat}
>   cc-mm:
>     image: 'bitnami/kafka:3.7.0'
>     container_name: cc-mm
>     hostname: cc-mm
>     ports:
>       - "9912:8083"
>     command: /opt/bitnami/kafka/bin/connect-mirror-maker.sh 
> /opt/bitnami/kafka/config/mm2.properties
>     environment:
>       <<: *kafka-cc-env-common
>     volumes:
>       - ./files/configs-compose/jmx-exporter:/opt/jmx-exporter
>       - 
> ./files/configs-compose/mm2/single-cluster/cc-single-mm.properties:/opt/bitnami/kafka/config/mm2.properties
>       - 
> ./files/configs-compose/kafka/log4j.properties:/opt/bitnami/kafka/config/log4j.properties:ro
>     networks:
>       - kafka-1
>       - kafka-2
>     depends_on:
>       cl01-kafka-01:
>         condition: service_healthy
>       cl01-kafka-02:
>         condition: service_healthy
>       cl01-kafka-03:
>         condition: service_healthy
>       cl02-kafka-01:
>         condition: service_healthy
>       cl02-kafka-02:
>         condition: service_healthy
>       cl02-kafka-03:
>         condition: service_healthy
>         {noformat}
> {color:#cccccc} {color}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to