Hello all,

I am setting up mm2 to replicate messages, consumer groups, and consumer
offset from a->b.  I believe I am replicating those 3 items from a->b.  my
mm2 prop file is as followed:

```
# specify any number of cluster aliases
clusters = a,b
b.group.id=mm2-request

# replication settings
tasks.max = 24
replication.policy.class =
org.apache.kafka.connect.mirror.IdentityReplicationPolicy
a.max.poll.records = 20000
#a.receive.buffer.bytes = 33554432
#a.send.buffer.bytes = 33554432
#a.max.partition.fetch.bytes = 33554432
#a.message.max.bytes = 37755000
a.compression.type = gzip
#a.max.request.size = 26214400
#a.buffer.memory = 524288000
a.batch.size = 524288

b.max.poll.records = 20000
#b.receive.buffer.bytes = 33554432
#b.send.buffer.bytes = 33554432
#b.max.partition.fetch.bytes = 33554432
#b.message.max.bytes = 37755000
b.compression.type = gzip
#b.max.request.size = 26214400
#b.buffer.memory = 524288000
b.batch.size = 524288

a.bootstrap.servers = aaa.aws.confluent.cloud:9092
a.sasl.jaas.config =
org.apache.kafka.common.security.plain.PlainLoginModule required username=
"aaa" password="aaa";
a.sasl.mechanism = PLAIN
a.security.protocol = SASL_SSL
a.ssl.endpoint.identification.algroithm = https

b.bootstrap.servers = bbb.aws.confluent.cloud:9092
b.sasl.jaas.config =
org.apache.kafka.common.security.plain.PlainLoginModule required username=
"bbb" password="bbb";
b.sasl.mechanism = PLAIN
b.security.protocol = SASL_SSL
b.ssl.endpoint.identification.algroithm = https

# enable and configure individual replication flows
a->b.enabled = true

# regex which defines which topics gets replicated. For eg "foo-.*"
# a->b.topics = .*
a->b.topics = order-cmd-request-01

# topic exclusion
topics.blacklist = .*[\\-\\.]internal, .*\\.replica

# group to replicate
groups = .*

# group exclusion
# groups.blacklist = console-consumer-.*

sync.topic.acls.enabled = false
sync.topic.configs.enabled = true
refresh.topics.enabled = false
refresh.topics.interval.seconds = 600

checkpoints.topic.replication.factor = 3
heartbeats.topic.replication.factor = 3
offset-syncs.topic.replication.factor = 3
offset.storage.replication.factor = 3
status.storage.replication.factor = 3
config.storage.replication.factor = 3

refresh.groups.enabled = true
refresh.groups.interval.seconds = 600

a->b.sync.group.offsets.enabled = true
sync.group.offsets.interval.seconds = 5
emit.checkpoints.interval.seconds = 5
emit.heartbeats.interval.seconds = 5
offset.translate.method = simple

# sync acl
# sync.topic.acls.enabled = false

# enable heartbeat
emit.heartbeats.enabled = true
emit.checkpoints.enabled = true

# Setting replication factor of newly created remote topics
replication.factor = 3

############################# Internal Topic Settings
#############################
# The replication factor for mm2 internal topics "heartbeats",
"B.checkpoints.internal" and
# "mm2-offset-syncs.B.internal"
# For anything other than development testing, a value greater than 1 is
recommended to ensure availability such as 3.
checkpoints.topic.replication.factor = 3
heartbeats.topic.replication.factor = 3
offset-syncs.topic.replication.factor = 3

# The replication factor for connect internal topics
"mm2-configs.B.internal", "mm2-offsets.B.internal" and
# "mm2-status.B.internal"
# For anything other than development testing, a value greater than 1 is
recommended to ensure availability such as 3.
offset.storage.replication.factor = 3
status.storage.replication.factor = 3
config.storage.replication.factor = 3
```
I'm able to see the messages and such on the 'b' cluster.  I then proceed
to terminate the api that was pointing to the 'a' cluster.  Repoint it to
the 'b' cluster.  Redeploy and upon starting up, the api is throwing an
error:

```
{"@timestamp":"2024-01-25T20:49:32.758Z", "log.level": "WARN", "message":"Error
registering AppInfo mbean", "ecs.version": "1.2.0","process.thread.name":
"main","log.logger":"org.apache.kafka.common.utils.AppInfoParser","
service.name":"prod-usf-order-integration-debug-api","error.type":
"javax.management.InstanceAlreadyExistsException","error.message":
"kafka.consumer:type=app-info,id=consumer-usfcom4-cg-order-cdc-debug-order_requests-01-usfcom4-cg-order-cdc-debug-order_requests-012c78995a-5-0"
,"error.stack_trace":"javax.management.InstanceAlreadyExistsException:
kafka.consumer:type=app-info,id=consumer-usfcom4-cg-order-cdc-debug-order_requests-01-usfcom4-cg-order-cdc-debug-order_requests-012c78995a-5-0\n\tat
java.management/com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:436)\n\tat
java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1855)\n\tat
java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:955)\n\tat
java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:890)\n\tat
java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:320)\n\tat
java.management/com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)\n\tat
org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64)\n\tat
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:814)\n\tat
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:632)\n\tat
org.springframework.kafka.core.DefaultKafkaConsumerFactory.createRawConsumer(DefaultKafkaConsumerFactory.java:358)\n\tat
org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:326)\n\tat
org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer
WithAdjustedProperties(DefaultKafkaConsumerFactory.java:302)\n\tat
org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:269)\n\tat
org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:243)\n\tat
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:639)\n\tat
org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:305)\n\tat
org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:338)\n\tat
org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:204)\n\tat
org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:338)\n\tat
org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:312)\n\tat
org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:257)\n\tat
org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178)\n\tat
org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:54)\n\tat
org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356)\n\tat
java.base/java.lang.Iterable.forEach(Iterable.java:75)\n\tat
org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:155)\n\tat
org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:123)\n\tat
org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:934)\n\tat
org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:585)\n
\tat
org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:144)\n\tat
org.springframework.boot.SpringApplication.refresh(SpringApplication.java:767)\n\tat
org.springframework.boot.SpringApplication.refresh(SpringApplication.java:759)\n\tat
org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:426)\n\tat
org.springframework.boot.SpringApplication.run(SpringApplication.java:326)\n\tat
org.springframework.boot.SpringApplication.run(SpringApplication.java:1311)\n\tat
org.springframework.boot.SpringApplication.run(SpringApplication.java:1300)\n\tat
com.usfoods.panamax.integration.order.PanamaxOrderIntegrationApiApplication.main(PanamaxOrderIntegrationApiApplication.java:40)\n"
}


{"@timestamp":"2024-01-25T20:49:34.079Z", "log.level":"ERROR",
"message":"[Consumer
instanceId=usfcom4-cg-order-cdc-debug-order_requests-012c78995a-5-0,
clientId=consumer-usfcom4-cg-order-cdc-debug-order_requests-01-usfcom4-cg-order-cdc-debug-order_requests-012c78995a-5-0,
groupId=usfcom4-cg-order-cdc-debug-order_requests-01] Attempt to join group
with generation
Optional[usfcom4-cg-order-cdc-debug-order_requests-012c78995a-5-0] failed
because the group instance id Generation{generationId=-1, memberId='',
protocol='null'} has been fenced by another instance", "ecs.version":
"1.2.0","process.thread.name":
"org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1",
"log.logger":
"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","
service.name":"prod-usf-order-integration-debug-api"}
{"@timestamp":"2024-01-25T20:49:34.080Z", "log.level": "INFO",
"message":"[Consumer
instanceId=usfcom4-cg-order-cdc-debug-order_requests-012c78995a-5-0,
clientId=consumer-usfcom4-cg-order-cdc-debug-order_requests-01-usfcom4-cg-order-cdc-debug-order_requests-012c78995a-5-0,
groupId=usfcom4-cg-order-cdc-debug-order_requests-01] Join group failed
with org.apache.kafka.common.errors.FencedInstanceIdException: The broker
rejected this static consumer since another consumer with the same
group.instance.id has registered with a different member.id.", "ecs.version":
"1.2.0","process.thread.name":
"org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1",
"log.logger":
"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","
service.name":"prod-usf-order-integration-debug-api"}
{"@timestamp":"2024-01-25T20:49:34.081Z", "log.level":"ERROR", "message":"'
group.instance.id' has been fenced", "ecs.version": "1.2.0","
process.thread.name":
"org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1",
"log.logger":
"org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer"
,"service.name":"prod-usf-order-integration-debug-api","error.type":
"org.apache.kafka.common.errors.FencedInstanceIdException","error.message":"The
broker rejected this static consumer since another consumer with the same
group.instance.id has registered with a different member.id.",
"error.stack_trace":"org.apache.kafka.common.errors.FencedInstanceIdException:
The broker rejected this static consumer since another consumer with the
same group.instance.id has registered with a different member.id.\n"}
{"@timestamp":"2024-01-25T20:49:34.084Z", "log.level":"ERROR", "message":"Fatal
consumer exception; stopping container", "ecs.version": "1.2.0","
process.thread.name":
"org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1",
"log.logger":
"org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer"
,"service.name":"prod-usf-order-integration-debug-api"}```

I also perform a describe on all the group on the target side and this is
the result:

./kafka-consumer-groups.sh --bootstrap-server
bbb.us-west-2.aws.confluent.cloud:9092 --command-config
../config/cc_poc_prod.properties --describe --all-groups --stat

Consumer group 'usfcom4-cg-order-cdc-debug-order_requests-01' has no active
members.

GROUP COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS

usfcom4-cg-order-cdc-debug-order_requests-01
e-1d55.usw2-az2.dom4gj8jzp6.us-west-2.aws.confluent.cloud:9092 (2) Empty 0

Consumer group 'usfcom4-cg-order-cdc-order_requests-01' has no active
members.

GROUP COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS

usfcom4-cg-order-cdc-order_requests-01
e-1ed0.usw2-az2.dom4gj8jzp6.us-west-2.aws.confluent.cloud:9092 (0) Empty 0

I'm curious why, after shutting down the api, repointing it to the 'b'
cluster, starting up the api, the 'b' cluster is throwing an
InstanceAlreadyExistsException? At this point, the MM2 is still running.  I
believe I can have MM2 running while having the debug api work off the 'b'
cluster?

Any help would be appreciated.

Thanks,

Reply via email to