[ https://issues.apache.org/jira/browse/KAFKA-17186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868842#comment-17868842 ]
George Yang commented on KAFKA-17186: ------------------------------------- Thanks [~gharris1727] again. Sorry for the confusion, "affect the old Kafka cluster" is just mean the two Kafka clusters locate in each data center, please ignore the word old. To test the rebalance delay, I attempted to set the parameter {{scheduled.rebalance.max.delay.ms=20000}} in {{{}connect-mirror-maker.properties{}}}. However, the log file {{mirrormaker.out}} continued to show the old value of {{scheduled.rebalance.max.delay.ms = 300000}} under the DistributedConfig section. Subsequently, I tried setting it as follows: A. {{distributed.scheduled.rebalance.max.delay.ms=20000}} B. {{distributed.scheduled.rebalance.max.delay.ms=20000}} Nevertheless, {{mirrormaker.out}} still indicates the default value of 300000. How should I configure this parameter to make it effective? > Cannot receive message after stopping Source Mirror Maker 2 > ----------------------------------------------------------- > > Key: KAFKA-17186 > URL: https://issues.apache.org/jira/browse/KAFKA-17186 > Project: Kafka > Issue Type: Bug > Components: mirrormaker > Affects Versions: 3.7.1 > Environment: Source Kafka Cluster per Node: > CPU(s): 32 > Memory: 32G/1.1G free > Target Kafka Cluster standalone Node: > CPU(s): 24 > Memory: 30G/909M free > Kafka Version 3.7 > Mirrormaker Version 3.7.1 > Reporter: George Yang > Priority: Major > Attachments: image-2024-07-25-14-14-21-327.png, mirrorMaker.out > > > Deploy nodes 1, 2, and 3 in Data Center A, with MM2 service deployed on node > 1. Deploy node 1 in Data Center B, with MM2 service also deployed on node 1. > Currently, a service on node 1 in Data Center A acts as a producer sending > messages to the `myTest` topic. A service in Data Center B acts as a consumer > listening to `A.myTest`. > The issue arises when MM2 on node 1 in Data Center A is stopped: the consumer > in Data Center B ceases to receive messages. Even after I restarting MM2 in > Data Center A, the consumer in Data Center B still does not receive messages > until approximately 5 minutes later when a rebalance occurs, at which point > it begins receiving messages again. > > [Logs From Consumer on Data Center B] > [2024-07-23 17:29:17,270] INFO [MirrorCheckpointConnector|worker] refreshing > consumer groups took 185 ms (org.apache.kafka.connect.mirror.Scheduler:95) > [2024-07-23 17:29:19,189] INFO [MirrorCheckpointConnector|worker] refreshing > consumer groups took 365 ms (org.apache.kafka.connect.mirror.Scheduler:95) > [2024-07-23 17:29:22,271] INFO [MirrorCheckpointConnector|worker] refreshing > consumer groups took 186 ms (org.apache.kafka.connect.mirror.Scheduler:95) > [2024-07-23 17:29:24,193] INFO [MirrorCheckpointConnector|worker] refreshing > consumer groups took 369 ms (org.apache.kafka.connect.mirror.Scheduler:95) > [2024-07-23 17:29:25,377] INFO [Worker clientId=B->A, groupId=B-mm2] > Rebalance started > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:242) > [2024-07-23 17:29:25,377] INFO [Worker clientId=B->A, groupId=B-mm2] > (Re-)joining group > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:604) > [2024-07-23 17:29:25,386] INFO [Worker clientId=B->A, groupId=B-mm2] > Successfully joined group with generation Generation\{generationId=52, > memberId='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', protocol='sessioned'} > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:665) > [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] > Successfully synced group in generation Generation\{generationId=52, > memberId='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', protocol='sessioned'} > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:842) > [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] Joined > group at generation 52 with protocol version 2 and got assignment: > Assignment\{error=0, leader='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', > leaderUrl='NOTUSED', offset=1360, connectorIds=[MirrorCheckpointConnector], > taskIds=[MirrorCheckpointConnector-0, MirrorCheckpointConnector-1, > MirrorCheckpointConnector-2], revokedConnectorIds=[], revokedTaskIds=[], > delay=0} with rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:2580) > [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] Starting > connectors and tasks using config offset 1360 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1921) > [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] Finished > starting connectors and tasks > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1950) > [2024-07-23 17:29:26,883] INFO [Worker clientId=A->B, groupId=A-mm2] > Rebalance started > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:242) > [2024-07-23 17:29:26,883] INFO [Worker clientId=A->B, groupId=A-mm2] > (Re-)joining group > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:604) > [2024-07-23 17:29:26,890] INFO [Worker clientId=A->B, groupId=A-mm2] > Successfully joined group with generation Generation\{generationId=143, > memberId='A->B-0d04e6c1-f12a-4121-89af-e9992a167a01', protocol='sessioned'} > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:665) > [2024-07-23 17:29:26,893] INFO [Worker clientId=A->B, groupId=A-mm2] > Successfully synced group in generation Generation\{generationId=143, > memberId='A->B-0d04e6c1-f12a-4121-89af-e9992a167a01', protocol='sessioned'} > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:842) > [Configuration] > name=MCS-MM2 > clusters = A, B > A.bootstrap.servers = [kafka1]:[port],[kafka2]:[port],[kafka3]:[port] > B.bootstrap.servers = [kafka]:[port] > # enable and configure individual replication flows > A->B.enabled = true > A->B.topics = .* > B->A.enabled = true > B->A.topics = .* > replication.factor=2 > tasks.max=3 > emit.checkpoints.interval.seconds=5 > A.producer.acks=all > A.producer.batch.size=50000 > A.consumer.auto.offset.reset=latest > B.consumer.auto.offset.reset=latest > A.consumer.enable.auto.commit=true > B.consumer.enable.auto.commit=true > A.consumer.max.poll.interval.ms=20000 > B.consumer.max.poll.interval.ms=20000 > checkpoints.topic.replication.factor=1 > heartbeats.topic.replication.factor=1 > offset-syncs.topic.replication.factor=1 > offset.storage.replication.factor=1 > status.storage.replication.factor=1 > config.storage.replication.factor=1 > refresh.topics.enabled=true > refresh.topics.interval.seconds=5 > refresh.groups.enabled=true > refresh.groups.interval.seconds=5 > -- This message was sent by Atlassian Jira (v8.20.10#820010)