[ 
https://issues.apache.org/jira/browse/KAFKA-19318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dennis lucero updated KAFKA-19318:
----------------------------------
        Fix Version/s:     (was: 3.7.1)
    Affects Version/s: 3.9.0
                           (was: 3.7.0)
                           (was: 3.6.1)
                           (was: 3.6.2)
          Description: 
Despite several attempts to migrate from Zookeeper cluster to Kraft, it failed 
to properly migrate.

We spawn a need cluster fully healthy with 18 Kafka nodes connected to 5 
Zookeeper nodes. 9 new Kafka nodes are there for the new controllers.
It was tested with Kafka 3.9.0

The controllers are started with:
{code:java}
==> /var/log/kafka/kafka-server.log <==
[2025-05-20 15:04:28,798] INFO Registered kafka:type=kafka.Log4jController 
MBean (kafka.utils.Log4jControllerRegistration$)
[2025-05-20 15:04:29,207] INFO [ControllerServer id=111] Starting controller 
(kafka.server.ControllerServer)
[2025-05-20 15:04:29,656] INFO Updated connection-accept-rate max connection 
creation rate to 2147483647 (kafka.network.ConnectionQuotas)
[2025-05-20 15:04:29,784] INFO [SocketServer listenerType=CONTROLLER, 
nodeId=111] Created data-plane acceptor and processors for endpoint : 
ListenerName(CONTROLLER) (kafka.network.SocketServer)
[2025-05-20 15:04:29,792] INFO [SharedServer id=111] Starting SharedServer 
(kafka.server.SharedServer)
[2025-05-20 15:04:29,841] INFO [LogLoader partition=__cluster_metadata-0, 
dir=/data/kafka1/kafka] Recovering unflushed segment 0. 0/1 recovered for 
__cluster_metadata-0. (kafka.log.LogLoader)
[2025-05-20 15:04:29,842] INFO [LogLoader partition=__cluster_metadata-0, 
dir=/data/kafka1/kafka] Loading producer state till offset 0 with message 
format version 2 (kafka.log.UnifiedLog$)
[2025-05-20 15:04:29,843] INFO [LogLoader partition=__cluster_metadata-0, 
dir=/data/kafka1/kafka] Reloading from producer snapshot and rebuilding 
producer state from offset 0 (kafka.log.UnifiedLog$)
[2025-05-20 15:04:29,843] INFO [LogLoader partition=__cluster_metadata-0, 
dir=/data/kafka1/kafka] Producer state recovery took 0ms for snapshot load and 
0ms for segment recovery from offset 0 (kafka.log.UnifiedLog$)
[2025-05-20 15:04:29,905] INFO [LogLoader partition=__cluster_metadata-0, 
dir=/data/kafka1/kafka] Loading producer state till offset 2709 with message 
format version 2 (kafka.log.UnifiedLog$)
[2025-05-20 15:04:29,905] INFO [LogLoader partition=__cluster_metadata-0, 
dir=/data/kafka1/kafka] Reloading from producer snapshot and rebuilding 
producer state from offset 2709 (kafka.log.UnifiedLog$)
[2025-05-20 15:04:29,906] INFO [LogLoader partition=__cluster_metadata-0, 
dir=/data/kafka1/kafka] Producer state recovery took 1ms for snapshot load and 
0ms for segment recovery from offset 2709 (kafka.log.UnifiedLog$)
[2025-05-20 15:04:29,940] INFO Initialized snapshots with IDs 
SortedSet(OffsetAndEpoch(offset=0, epoch=0)) from 
/data/kafka1/kafka/__cluster_metadata-0 (kafka.raft.KafkaMetadataLog$)
[2025-05-20 15:04:29,952] INFO [raft-expiration-reaper]: Starting 
(kafka.raft.TimingWheelExpirationService$ExpiredOperationReaper)
[2025-05-20 15:04:30,191] INFO [ControllerServer id=111] Waiting for controller 
quorum voters future (kafka.server.ControllerServer)
[2025-05-20 15:04:30,191] INFO [ControllerServer id=111] Finished waiting for 
controller quorum voters future (kafka.server.ControllerServer)
[2025-05-20 15:04:30,233] INFO [ZooKeeperClient KRaft Migration] Initializing a 
new session to 
prod-zk-11.ae-rus.net:2181,prod-zk-12.ae-rus.net:2181,prod-zk-13.ae-rus.net:2181,prod-zk-14.ae-rus.net:2181,prod-zk-15.ae-rus.net:2181.
 (kafka.zookeeper.ZooKeeperClient)
[2025-05-20 15:04:30,248] INFO [ZooKeeperClient KRaft Migration] Waiting until 
connected. (kafka.zookeeper.ZooKeeperClient)
[2025-05-20 15:04:30,261] INFO [ZooKeeperClient KRaft Migration] Connected. 
(kafka.zookeeper.ZooKeeperClient)
[2025-05-20 15:04:30,293] INFO [controller-111-ThrottledChannelReaper-Fetch]: 
Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2025-05-20 15:04:30,293] INFO [controller-111-ThrottledChannelReaper-Produce]: 
Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2025-05-20 15:04:30,294] INFO [controller-111-ThrottledChannelReaper-Request]: 
Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2025-05-20 15:04:30,297] INFO 
[controller-111-ThrottledChannelReaper-ControllerMutation]: Starting 
(kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2025-05-20 15:04:30,316] INFO [ExpirationReaper-111-AlterAcls]: Starting 
(kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2025-05-20 15:04:30,392] INFO [ControllerServer id=111] Waiting for the 
controller metadata publishers to be installed (kafka.server.ControllerServer)
[2025-05-20 15:04:30,393] INFO [ControllerServer id=111] Finished waiting for 
the controller metadata publishers to be installed 
(kafka.server.ControllerServer)
[2025-05-20 15:04:30,393] INFO [SocketServer listenerType=CONTROLLER, 
nodeId=111] Enabling request processing. (kafka.network.SocketServer)
[2025-05-20 15:04:30,394] INFO [ControllerRegistrationManager id=111 
incarnation=mbreItYzTmySXeahJkXxFA] Found registration for 
UwcgQ_r6T_uBVDMCdWLG6A instead of our incarnation. 
(kafka.server.ControllerRegistrationManager)
[2025-05-20 15:04:30,395] INFO Awaiting socket connections on 0.0.0.0:9091. 
(kafka.network.DataPlaneAcceptor)
[2025-05-20 15:04:30,455] INFO 
[controller-111-to-controller-registration-channel-manager]: Starting 
(kafka.server.NodeToControllerRequestThread)
[2025-05-20 15:04:30,455] INFO [ControllerRegistrationManager id=111 
incarnation=mbreItYzTmySXeahJkXxFA] initialized channel manager. 
(kafka.server.ControllerRegistrationManager)
[2025-05-20 15:04:30,455] INFO [ControllerServer id=111] Waiting for all of the 
authorizer futures to be completed (kafka.server.ControllerServer)
[2025-05-20 15:04:30,456] INFO [ControllerServer id=111] Finished waiting for 
all of the authorizer futures to be completed (kafka.server.ControllerServer)
[2025-05-20 15:04:30,456] INFO [ControllerServer id=111] Waiting for all of the 
SocketServer Acceptors to be started (kafka.server.ControllerServer)
[2025-05-20 15:04:30,456] INFO [ControllerServer id=111] Finished waiting for 
all of the SocketServer Acceptors to be started (kafka.server.ControllerServer)
[2025-05-20 15:04:30,457] INFO [ControllerRegistrationManager id=111 
incarnation=mbreItYzTmySXeahJkXxFA] sendControllerRegistration: attempting to 
send ControllerRegistrationRequestData(controllerId=111, 
incarnationId=mbreItYzTmySXeahJkXxFA, zkMigrationReady=true, 
listeners=[Listener(name='CONTROLLER', host='prodkafkacontr11z1.h.ae-rus.net', 
port=9091, securityProtocol=0)], features=[Feature(name='kraft.version', 
minSupportedVersion=0, maxSupportedVersion=1), Feature(name='metadata.version', 
minSupportedVersion=1, maxSupportedVersion=21)]) 
(kafka.server.ControllerRegistrationManager)
[2025-05-20 15:04:30,471] INFO 
[controller-111-to-controller-registration-channel-manager]: Recorded new KRaft 
controller, from now on will use node prodkafkacontr11z1.h.ae-rus.net:9091 (id: 
111 rack: null) (kafka.server.NodeToControllerRequestThread)
[2025-05-20 15:04:30,505] INFO [KafkaRaftServer nodeId=111] Kafka Server 
started (kafka.server.KafkaRaftServer)
[2025-05-20 15:04:30,579] INFO [ControllerRegistrationManager id=111 
incarnation=mbreItYzTmySXeahJkXxFA] Our registration has been persisted to the 
metadata log. (kafka.server.ControllerRegistrationManager)
[2025-05-20 15:04:30,581] INFO [ControllerRegistrationManager id=111 
incarnation=mbreItYzTmySXeahJkXxFA] RegistrationResponseHandler: controller 
acknowledged ControllerRegistrationRequest. 
(kafka.server.ControllerRegistrationManager){code}
{code:java}
cat /var/log/kafka/controller.log
[2025-05-20 15:04:30,357] TRACE [KRaftMigrationDriver id=111] Recovered 
migration state from ZK in 68765354 ns. Transitioned migration state from 
ZkMigrationLeadershipState{kraftControllerId=-1, kraftControllerEpoch=-1, 
kraftMetadataOffset=-1, kraftMetadataEpoch=-1, lastUpdatedTimeMs=-1, 
migrationZkVersion=-2, controllerZkEpoch=-1, controllerZkVersion=-2} to 
ZkMigrationLeadershipState{kraftControllerId=-1, kraftControllerEpoch=-1, 
kraftMetadataOffset=-1, kraftMetadataEpoch=-1, lastUpdatedTimeMs=1747648116174, 
migrationZkVersion=0, controllerZkEpoch=-1, controllerZkVersion=-2} 
(org.apache.kafka.metadata.migration.KRaftMigrationDriver)
[2025-05-20 15:04:30,358] INFO [KRaftMigrationDriver id=111] Initial migration 
of ZK metadata is not done. 
(org.apache.kafka.metadata.migration.KRaftMigrationDriver)
[2025-05-20 15:04:30,358] INFO [KRaftMigrationDriver id=111] 111 transitioning 
from UNINITIALIZED to INACTIVE state 
(org.apache.kafka.metadata.migration.KRaftMigrationDriver)
[2025-05-20 15:04:30,388] INFO [KRaftMigrationDriver id=111] Became active 
migration driver in 27621312 ns. Transitioned migration state from 
ZkMigrationLeadershipState{kraftControllerId=-1, kraftControllerEpoch=-1, 
kraftMetadataOffset=-1, kraftMetadataEpoch=-1, lastUpdatedTimeMs=1747648116174, 
migrationZkVersion=0, controllerZkEpoch=-1, controllerZkVersion=-2} to 
ZkMigrationLeadershipState{kraftControllerId=111, kraftControllerEpoch=2, 
kraftMetadataOffset=-1, kraftMetadataEpoch=-1, lastUpdatedTimeMs=1747648116174, 
migrationZkVersion=0, controllerZkEpoch=-1, controllerZkVersion=-2} 
(org.apache.kafka.metadata.migration.KRaftMigrationDriver)
[2025-05-20 15:04:30,388] INFO [KRaftMigrationDriver id=111] 111 transitioning 
from INACTIVE to WAIT_FOR_CONTROLLER_QUORUM state 
(org.apache.kafka.metadata.migration.KRaftMigrationDriver)
[2025-05-20 15:04:30,388] INFO [KRaftMigrationDriver id=111] Controller Quorum 
is ready for Zk to KRaft migration. Now waiting for ZK brokers. 
(org.apache.kafka.metadata.migration.KRaftMigrationDriver)
[2025-05-20 15:04:30,388] INFO [KRaftMigrationDriver id=111] 111 transitioning 
from WAIT_FOR_CONTROLLER_QUORUM to WAIT_FOR_BROKERS state 
(org.apache.kafka.metadata.migration.KRaftMigrationDriver)
[2025-05-20 15:04:30,389] INFO [KRaftMigrationDriver id=111] No brokers are 
known to KRaft, waiting for brokers to register. 
(org.apache.kafka.metadata.migration.KRaftMigrationDriver)
[2025-05-20 15:04:30,494] INFO [KRaftMigrationDriver id=111] No brokers are 
known to KRaft, waiting for brokers to register. 
(org.apache.kafka.metadata.migration.KRaftMigrationDriver)
[2025-05-20 15:04:30,579] TRACE [KRaftMigrationDriver id=111] Received metadata 
delta, but the controller is not in dual-write mode. Ignoring this metadata 
update. (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
[2025-05-20 15:04:30,580] INFO [KRaftMigrationDriver id=111] No brokers are 
known to KRaft, waiting for brokers to register. 
(org.apache.kafka.metadata.migration.KRaftMigrationDriver)
[2025-05-20 15:04:30,645] TRACE [KRaftMigrationDriver id=111] Received metadata 
delta, but the controller is not in dual-write mode. Ignoring this metadata 
update. (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
[2025-05-20 15:04:30,645] INFO [KRaftMigrationDriver id=111] No brokers are 
known to KRaft, waiting for brokers to register. 
(org.apache.kafka.metadata.migration.KRaftMigrationDriver)
[2025-05-20 15:04:30,701] TRACE [KRaftMigrationDriver id=111] Received metadata 
delta, but the controller is not in dual-write mode. Ignoring this metadata 
update. (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
[2025-05-20 15:04:30,701] INFO [KRaftMigrationDriver id=111] No brokers are 
known to KRaft, waiting for brokers to register. 
(org.apache.kafka.metadata.migration.KRaftMigrationDriver)
[2025-05-20 15:04:30,730] TRACE [KRaftMigrationDriver id=111] Received metadata 
delta, but the controller is not in dual-write mode. Ignoring this metadata 
update. (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
[2025-05-20 15:04:30,730] INFO [KRaftMigrationDriver id=111] No brokers are 
known to KRaft, waiting for brokers to register. 
(org.apache.kafka.metadata.migration.KRaftMigrationDriver)
[2025-05-20 15:04:30,778] TRACE [KRaftMigrationDriver id=111] Received metadata 
delta, but the controller is not in dual-write mode. Ignoring this metadata 
update. (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
[2025-05-20 15:04:30,861] TRACE [KRaftMigrationDriver id=111] Received metadata 
delta, but the controller is not in dual-write mode. Ignoring this metadata 
update. (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
[2025-05-20 15:04:30,861] INFO [KRaftMigrationDriver id=111] No brokers are 
known to KRaft, waiting for brokers to register. 
(org.apache.kafka.metadata.migration.KRaftMigrationDriver)
[2025-05-20 15:04:30,905] TRACE [KRaftMigrationDriver id=111] Received metadata 
delta, but the controller is not in dual-write mode. Ignoring this metadata 
update. (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
[2025-05-20 15:04:30,905] INFO [KRaftMigrationDriver id=111] No brokers are 
known to KRaft, waiting for brokers to register. 
(org.apache.kafka.metadata.migration.KRaftMigrationDriver)
[2025-05-20 15:04:31,202] TRACE [KRaftMigrationDriver id=111] Received metadata 
delta, but the controller is not in dual-write mode. Ignoring this metadata 
update. (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
[2025-05-20 15:04:31,202] INFO [KRaftMigrationDriver id=111] No brokers are 
known to KRaft, waiting for brokers to register. 
(org.apache.kafka.metadata.migration.KRaftMigrationDriver)
[2025-05-20 15:04:31,278] TRACE [KRaftMigrationDriver id=111] Received metadata 
delta, but the controller is not in dual-write mode. Ignoring this metadata 
update. (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
[2025-05-20 15:04:31,778] TRACE [KRaftMigrationDriver id=111] Received metadata 
delta, but the controller is not in dual-write mode. Ignoring this metadata 
update. (org.apache.kafka.metadata.migration.KRaftMigrationDriver) {code}
The config on the controller node is the following
{code:java}
cat /etc/kafka/server.properties
##
# Ansible managed
#host.name=prodkafkacontr14z4.h.ae-rus.netcluster.name=MrgOne
node.id=114
controller.rack=z4replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelectorprocess.roles=controllerdelete.topic.enable=True
auto.create.topics.enable=Truelistener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
listeners=CONTROLLER://:9091advertised.listeners=CONTROLLER://prodkafkacontr14z4.h.ae-rus.net:9091
controller.listener.names=CONTROLLER
inter.broker.listener.name=EXTERNALcontroller.quorum.voters=1...@prodkafkacontr11z1.h.ae-rus.net:9091,1...@prodkafkacontr12z1.h.ae-rus.net:9091,1...@prodkafkacontr13z1.h.ae-rus.net:9091,1...@prodkafkacontr17z3.h.ae-rus.net:9091,1...@prodkafkacontr18z3.h.ae-rus.net:9091,1...@prodkafkacontr19z3.h.ae-rus.net:9091,1...@prodkafkacontr14z4.h.ae-rus.net:9091,1...@prodkafkacontr15z4.h.ae-rus.net:9091,1...@prodkafkacontr16z4.h.ae-rus.net:9091controller.quorum.bootstrap.servers=prodkafkacontr11z1.h.ae-rus.net:9091,prodkafkacontr12z1.h.ae-rus.net:9091,prodkafkacontr13z1.h.ae-rus.net:9091,prodkafkacontr17z3.h.ae-rus.net:9091,prodkafkacontr18z3.h.ae-rus.net:9091,prodkafkacontr19z3.h.ae-rus.net:9091,prodkafkacontr14z4.h.ae-rus.net:9091,prodkafkacontr15z4.h.ae-rus.net:9091,prodkafkacontr16z4.h.ae-rus.net:9091
num.network.threads=128
background.threads=128num.io.threads=128
socket.send.buffer.bytes=2097152
socket.receive.buffer.bytes=2097152
socket.request.max.bytes=104857600
queued.max.requests=32768offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3log.dirs=/data/kafka1/kafka
num.recovery.threads.per.data.dir=16log.flush.interval.messages=10240
log.flush.interval.ms=5000log.retention.hours=48
log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000group.initial.rebalance.delay.ms=1000auto.leader.rebalance.enable=True
leader.imbalance.check.interval.seconds=300
leader.imbalance.per.broker.percentage=20default.replication.factor=3
num.replica.fetchers=64
replica.fetch.max.bytes=41943040
replica.fetch.wait.max.ms=1000
replica.high.watermark.checkpoint.interval.ms=5000
replica.socket.timeout.ms=30000
replica.socket.receive.buffer.bytes=65536
replica.lag.time.max.ms=10000############################# Extra Properties 
#############################
offsets.topic.num.partitions=150
message.max.bytes=20971520
batch.size=65536
zookeeper.connect=prod-zk-11.ae-rus.net:2181,prod-zk-12.ae-rus.net:2181,prod-zk-13.ae-rus.net:2181,prod-zk-14.ae-rus.net:2181,prod-zk-15.ae-rus.net:2181
rack.aware.assignment.strategy=balance_subtopology
rack.aware.assignment.traffic_cost=org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor
compression.gzip.level=5
zookeeper.metadata.migration.enable=True
zookeeper.metadata.migration.min.batch.size=512
initial.broker.registration.timeout.ms=18000  {code}
The config on the broker node is the following
{code:java}
$ cat /etc/kafka/server.properties
##
# Ansible managed
#host.name=prodkafka2z1.h.ae-rus.netcluster.name=MrgOne
node.id=2
broker.rack=z1replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelectorprocess.roles=brokerdelete.topic.enable=True
auto.create.topics.enable=Truelistener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
listeners=EXTERNAL://:9092,INTERNAL://:29092advertised.listeners=EXTERNAL://prod-kafka-12.ae-rus.net:9092,INTERNAL://prod-kafka-12.ae-rus.net:29092
controller.listener.names=CONTROLLER
inter.broker.listener.name=EXTERNALcontroller.quorum.voters=1...@prodkafkacontr11z1.h.ae-rus.net:9091,1...@prodkafkacontr12z1.h.ae-rus.net:9091,1...@prodkafkacontr13z1.h.ae-rus.net:9091,1...@prodkafkacontr17z3.h.ae-rus.net:9091,1...@prodkafkacontr18z3.h.ae-rus.net:9091,1...@prodkafkacontr19z3.h.ae-rus.net:9091,1...@prodkafkacontr14z4.h.ae-rus.net:9091,1...@prodkafkacontr15z4.h.ae-rus.net:9091,1...@prodkafkacontr16z4.h.ae-rus.net:9091controller.quorum.bootstrap.servers=prodkafkacontr11z1.h.ae-rus.net:9091,prodkafkacontr12z1.h.ae-rus.net:9091,prodkafkacontr13z1.h.ae-rus.net:9091,prodkafkacontr17z3.h.ae-rus.net:9091,prodkafkacontr18z3.h.ae-rus.net:9091,prodkafkacontr19z3.h.ae-rus.net:9091,prodkafkacontr14z4.h.ae-rus.net:9091,prodkafkacontr15z4.h.ae-rus.net:9091,prodkafkacontr16z4.h.ae-rus.net:9091
num.network.threads=128
background.threads=128num.io.threads=128
socket.send.buffer.bytes=2097152
socket.receive.buffer.bytes=2097152
socket.request.max.bytes=104857600
queued.max.requests=32768offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3log.dirs=/data/kafka1/kafka,/data/kafka2/kafka,/data/kafka3/kafka,/data/kafka4/kafka,/data/kafka5/kafka,/data/kafka6/kafka
num.recovery.threads.per.data.dir=16log.flush.interval.messages=10240
log.flush.interval.ms=5000log.retention.hours=48
log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000group.initial.rebalance.delay.ms=1000auto.leader.rebalance.enable=True
leader.imbalance.check.interval.seconds=300
leader.imbalance.per.broker.percentage=20default.replication.factor=3
num.replica.fetchers=64
replica.fetch.max.bytes=41943040
replica.fetch.wait.max.ms=1000
replica.high.watermark.checkpoint.interval.ms=5000
replica.socket.timeout.ms=30000
replica.socket.receive.buffer.bytes=65536
replica.lag.time.max.ms=10000############################# Extra Properties 
#############################
offsets.topic.num.partitions=150
message.max.bytes=20971520
batch.size=65536
zookeeper.connect=prod-zk-11.ae-rus.net:2181,prod-zk-12.ae-rus.net:2181,prod-zk-13.ae-rus.net:2181,prod-zk-14.ae-rus.net:2181,prod-zk-15.ae-rus.net:2181
rack.aware.assignment.strategy=balance_subtopology
rack.aware.assignment.traffic_cost=org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor
compression.gzip.level=5
zookeeper.metadata.migration.enable=True
zookeeper.metadata.migration.min.batch.size=512
initial.broker.registration.timeout.ms=18000
{code}
When trying to move to the next step (`Migrating brokers to KRaft`), it fails 
to get controller quorum and crashes.
{code:java}
==> /var/log/kafka/kafka-server.log <==
[2025-05-20 15:24:07,521] INFO Registered kafka:type=kafka.Log4jController 
MBean (kafka.utils.Log4jControllerRegistration$)
[2025-05-20 15:24:07,826] INFO [BrokerServer id=2] Transition from SHUTDOWN to 
STARTING (kafka.server.BrokerServer)
[2025-05-20 15:24:07,827] INFO [SharedServer id=2] Starting SharedServer 
(kafka.server.SharedServer)
[2025-05-20 15:24:07,884] INFO [LogLoader partition=__cluster_metadata-0, 
dir=/data/kafka1/kafka] Recovering unflushed segment 21124. 0/1 recovered for 
__cluster_metadata-0. (kafka.log.LogLoader)
[2025-05-20 15:24:07,885] INFO [LogLoader partition=__cluster_metadata-0, 
dir=/data/kafka1/kafka] Loading producer state till offset 21124 with message 
format version 2 (kafka.log.UnifiedLog$)
[2025-05-20 15:24:07,886] INFO [LogLoader partition=__cluster_metadata-0, 
dir=/data/kafka1/kafka] Reloading from producer snapshot and rebuilding 
producer state from offset 21124 (kafka.log.UnifiedLog$)
[2025-05-20 15:24:07,893] INFO [LogLoader partition=__cluster_metadata-0, 
dir=/data/kafka1/kafka] Producer state recovery took 0ms for snapshot load and 
7ms for segment recovery from offset 21124 (kafka.log.UnifiedLog$)
[2025-05-20 15:24:07,958] INFO [LogLoader partition=__cluster_metadata-0, 
dir=/data/kafka1/kafka] Loading producer state till offset 31424 with message 
format version 2 (kafka.log.UnifiedLog$)
[2025-05-20 15:24:07,958] INFO [LogLoader partition=__cluster_metadata-0, 
dir=/data/kafka1/kafka] Reloading from producer snapshot and rebuilding 
producer state from offset 31424 (kafka.log.UnifiedLog$)
[2025-05-20 15:24:07,960] INFO [LogLoader partition=__cluster_metadata-0, 
dir=/data/kafka1/kafka] Producer state recovery took 2ms for snapshot load and 
0ms for segment recovery from offset 31424 (kafka.log.UnifiedLog$)
[2025-05-20 15:24:07,995] INFO Initialized snapshots with IDs 
SortedSet(OffsetAndEpoch(offset=21124, epoch=7)) from 
/data/kafka1/kafka/__cluster_metadata-0 (kafka.raft.KafkaMetadataLog$)
[2025-05-20 15:24:08,023] INFO [raft-expiration-reaper]: Starting 
(kafka.raft.TimingWheelExpirationService$ExpiredOperationReaper)
[2025-05-20 15:24:08,301] INFO [BrokerServer id=2] Starting broker 
(kafka.server.BrokerServer)
[2025-05-20 15:24:08,318] INFO [broker-2-ThrottledChannelReaper-Fetch]: 
Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2025-05-20 15:24:08,319] INFO [broker-2-ThrottledChannelReaper-Produce]: 
Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2025-05-20 15:24:08,320] INFO [broker-2-ThrottledChannelReaper-Request]: 
Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2025-05-20 15:24:08,322] INFO 
[broker-2-ThrottledChannelReaper-ControllerMutation]: Starting 
(kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2025-05-20 15:24:08,365] INFO [BrokerServer id=2] Waiting for controller 
quorum voters future (kafka.server.BrokerServer)
[2025-05-20 15:24:08,365] INFO [BrokerServer id=2] Finished waiting for 
controller quorum voters future (kafka.server.BrokerServer)
[2025-05-20 15:24:08,371] INFO 
[broker-2-to-controller-forwarding-channel-manager]: Starting 
(kafka.server.NodeToControllerRequestThread)
[2025-05-20 15:24:08,592] INFO Updated connection-accept-rate max connection 
creation rate to 2147483647 (kafka.network.ConnectionQuotas)
[2025-05-20 15:24:08,687] INFO [SocketServer listenerType=BROKER, nodeId=2] 
Created data-plane acceptor and processors for endpoint : 
ListenerName(EXTERNAL) (kafka.network.SocketServer)
[2025-05-20 15:24:08,688] INFO Updated connection-accept-rate max connection 
creation rate to 2147483647 (kafka.network.ConnectionQuotas)
[2025-05-20 15:24:08,759] INFO [SocketServer listenerType=BROKER, nodeId=2] 
Created data-plane acceptor and processors for endpoint : 
ListenerName(INTERNAL) (kafka.network.SocketServer)
[2025-05-20 15:24:08,765] INFO 
[broker-2-to-controller-alter-partition-channel-manager]: Starting 
(kafka.server.NodeToControllerRequestThread)
[2025-05-20 15:24:08,771] INFO 
[broker-2-to-controller-directory-assignments-channel-manager]: Starting 
(kafka.server.NodeToControllerRequestThread)
[2025-05-20 15:24:08,788] INFO [ExpirationReaper-2-Produce]: Starting 
(kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2025-05-20 15:24:08,788] INFO [ExpirationReaper-2-Fetch]: Starting 
(kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2025-05-20 15:24:08,789] INFO [ExpirationReaper-2-DeleteRecords]: Starting 
(kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2025-05-20 15:24:08,790] INFO [ExpirationReaper-2-ElectLeader]: Starting 
(kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2025-05-20 15:24:08,790] INFO [ExpirationReaper-2-RemoteFetch]: Starting 
(kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2025-05-20 15:24:08,808] INFO [ExpirationReaper-2-Heartbeat]: Starting 
(kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2025-05-20 15:24:08,808] INFO [ExpirationReaper-2-Rebalance]: Starting 
(kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2025-05-20 15:24:08,863] INFO 
[broker-2-to-controller-heartbeat-channel-manager]: Starting 
(kafka.server.NodeToControllerRequestThread)
[2025-05-20 15:24:08,865] INFO [BrokerLifecycleManager id=2] Incarnation 
ZvarJwVqQZOUzzobd91HlQ of broker 2 in cluster 9ca1s2tbTGm1Bi35aynf6Q is now 
STARTING. (kafka.server.BrokerLifecycleManager)
[2025-05-20 15:24:08,898] INFO [ExpirationReaper-2-AlterAcls]: Starting 
(kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2025-05-20 15:24:08,978] INFO [BrokerServer id=2] Waiting for the broker 
metadata publishers to be installed (kafka.server.BrokerServer)
[2025-05-20 15:24:08,979] INFO [BrokerServer id=2] Finished waiting for the 
broker metadata publishers to be installed (kafka.server.BrokerServer)
[2025-05-20 15:24:08,979] INFO [BrokerServer id=2] Waiting for the controller 
to acknowledge that we are caught up (kafka.server.BrokerServer)
{code}
{code:java}
==> /var/log/kafka/kafka-server.log <==
[2025-05-20 15:24:13,380] INFO [BrokerLifecycleManager id=2] Unable to register 
the broker because the RPC got timed out before it could be sent. 
(kafka.server.BrokerLifecycleManager)
[2025-05-20 15:24:17,990] INFO [BrokerLifecycleManager id=2] Unable to register 
the broker because the RPC got timed out before it could be sent. 
(kafka.server.BrokerLifecycleManager)
[2025-05-20 15:24:22,701] INFO [BrokerLifecycleManager id=2] Unable to register 
the broker because the RPC got timed out before it could be sent. 
(kafka.server.BrokerLifecycleManager)
[2025-05-20 15:24:26,864] ERROR [BrokerLifecycleManager id=2] Shutting down 
because we were unable to register with the controller quorum. 
(kafka.server.BrokerLifecycleManager)
[2025-05-20 15:24:26,864] INFO [BrokerLifecycleManager id=2] Transitioning from 
STARTING to SHUTTING_DOWN. (kafka.server.BrokerLifecycleManager)
[2025-05-20 15:24:26,865] INFO 
[broker-2-to-controller-heartbeat-channel-manager]: Shutting down 
(kafka.server.NodeToControllerRequestThread)
[2025-05-20 15:24:26,865] INFO 
[broker-2-to-controller-heartbeat-channel-manager]: Stopped 
(kafka.server.NodeToControllerRequestThread)
[2025-05-20 15:24:26,865] INFO 
[broker-2-to-controller-heartbeat-channel-manager]: Shutdown completed 
(kafka.server.NodeToControllerRequestThread)
[2025-05-20 15:24:26,865] ERROR [BrokerServer id=2] Received a fatal error 
while waiting for the controller to acknowledge that we are caught up 
(kafka.server.BrokerServer)
java.util.concurrent.CancellationException
    at 
java.base/java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2510)
    at 
kafka.server.BrokerLifecycleManager$ShutdownEvent.run(BrokerLifecycleManager.scala:638)
    at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:191)
    at java.base/java.lang.Thread.run(Thread.java:1583)
[2025-05-20 15:24:26,867] INFO Node to controller channel manager for heartbeat 
shutdown (kafka.server.NodeToControllerChannelManagerImpl)
[2025-05-20 15:24:26,868] INFO [BrokerServer id=2] Transition from STARTING to 
STARTED (kafka.server.BrokerServer)
[2025-05-20 15:24:26,870] ERROR [BrokerServer id=2] Fatal error during broker 
startup. Prepare to shutdown (kafka.server.BrokerServer)
java.lang.RuntimeException: Received a fatal error while waiting for the 
controller to acknowledge that we are caught up
    at 
org.apache.kafka.server.util.FutureUtils.waitWithLogging(FutureUtils.java:73)
    at kafka.server.BrokerServer.startup(BrokerServer.scala:502)
    at kafka.server.KafkaRaftServer.$anonfun$startup$2(KafkaRaftServer.scala:99)
    at 
kafka.server.KafkaRaftServer.$anonfun$startup$2$adapted(KafkaRaftServer.scala:99)
    at scala.Option.foreach(Option.scala:437)
    at kafka.server.KafkaRaftServer.startup(KafkaRaftServer.scala:99)
    at kafka.Kafka$.main(Kafka.scala:112)
    at kafka.Kafka.main(Kafka.scala)
Caused by: java.util.concurrent.CancellationException
    at 
java.base/java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2510)
    at 
kafka.server.BrokerLifecycleManager$ShutdownEvent.run(BrokerLifecycleManager.scala:638)
    at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:191)
    at java.base/java.lang.Thread.run(Thread.java:1583)
[2025-05-20 15:24:26,870] INFO [BrokerServer id=2] Transition from STARTED to 
SHUTTING_DOWN (kafka.server.BrokerServer)
[2025-05-20 15:24:26,871] INFO [BrokerServer id=2] shutting down 
(kafka.server.BrokerServer)
[2025-05-20 15:24:26,871] INFO [SocketServer listenerType=BROKER, nodeId=2] 
Stopping socket server request processors (kafka.network.SocketServer)
[2025-05-20 15:24:26,874] ERROR Cannot invoke 
"java.nio.channels.ServerSocketChannel.close()" because the return value of 
"kafka.network.Acceptor.serverChannel()" is null 
(kafka.network.DataPlaneAcceptor)
java.lang.NullPointerException: Cannot invoke 
"java.nio.channels.ServerSocketChannel.close()" because the return value of 
"kafka.network.Acceptor.serverChannel()" is null
    at kafka.network.Acceptor.$anonfun$closeAll$2(SocketServer.scala:711)
    at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:68)
    at kafka.network.Acceptor.closeAll(SocketServer.scala:711)
    at kafka.network.Acceptor.close(SocketServer.scala:678)
    at 
kafka.network.SocketServer.$anonfun$stopProcessingRequests$4(SocketServer.scala:287)
    at 
kafka.network.SocketServer.$anonfun$stopProcessingRequests$4$adapted(SocketServer.scala:287)
    at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:619)
    at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:617)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:935)
    at kafka.network.SocketServer.stopProcessingRequests(SocketServer.scala:287)
    at kafka.server.BrokerServer.$anonfun$shutdown$5(BrokerServer.scala:638)
    at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:68)
    at kafka.server.BrokerServer.shutdown(BrokerServer.scala:638)
    at kafka.server.KafkaBroker.shutdown(KafkaBroker.scala:98)
    at kafka.server.KafkaBroker.shutdown$(KafkaBroker.scala:98)
    at kafka.server.BrokerServer.shutdown(BrokerServer.scala:67)
    at kafka.server.BrokerServer.startup(BrokerServer.scala:555)
    at kafka.server.KafkaRaftServer.$anonfun$startup$2(KafkaRaftServer.scala:99)
    at 
kafka.server.KafkaRaftServer.$anonfun$startup$2$adapted(KafkaRaftServer.scala:99)
    at scala.Option.foreach(Option.scala:437)
    at kafka.server.KafkaRaftServer.startup(KafkaRaftServer.scala:99)
    at kafka.Kafka$.main(Kafka.scala:112)
    at kafka.Kafka.main(Kafka.scala)
[2025-05-20 15:24:26,906] ERROR Cannot invoke 
"java.nio.channels.ServerSocketChannel.close()" because the return value of 
"kafka.network.Acceptor.serverChannel()" is null 
(kafka.network.DataPlaneAcceptor)
java.lang.NullPointerException: Cannot invoke 
"java.nio.channels.ServerSocketChannel.close()" because the return value of 
"kafka.network.Acceptor.serverChannel()" is null
    at kafka.network.Acceptor.$anonfun$closeAll$2(SocketServer.scala:711)
    at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:68)
    at kafka.network.Acceptor.closeAll(SocketServer.scala:711)
    at kafka.network.Acceptor.close(SocketServer.scala:678)
    at 
kafka.network.SocketServer.$anonfun$stopProcessingRequests$4(SocketServer.scala:287)
    at 
kafka.network.SocketServer.$anonfun$stopProcessingRequests$4$adapted(SocketServer.scala:287)
    at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:619)
    at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:617)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:935)
    at kafka.network.SocketServer.stopProcessingRequests(SocketServer.scala:287)
    at kafka.server.BrokerServer.$anonfun$shutdown$5(BrokerServer.scala:638)
    at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:68)
    at kafka.server.BrokerServer.shutdown(BrokerServer.scala:638)
    at kafka.server.KafkaBroker.shutdown(KafkaBroker.scala:98)
    at kafka.server.KafkaBroker.shutdown$(KafkaBroker.scala:98)
    at kafka.server.BrokerServer.shutdown(BrokerServer.scala:67)
    at kafka.server.BrokerServer.startup(BrokerServer.scala:555)
    at kafka.server.KafkaRaftServer.$anonfun$startup$2(KafkaRaftServer.scala:99)
    at 
kafka.server.KafkaRaftServer.$anonfun$startup$2$adapted(KafkaRaftServer.scala:99)
    at scala.Option.foreach(Option.scala:437)
    at kafka.server.KafkaRaftServer.startup(KafkaRaftServer.scala:99)
    at kafka.Kafka$.main(Kafka.scala:112)
    at kafka.Kafka.main(Kafka.scala)
[2025-05-20 15:24:26,932] INFO [SocketServer listenerType=BROKER, nodeId=2] 
Stopped socket server request processors (kafka.network.SocketServer) {code}

  was:
Despite several attempts to migrate from Zookeeper cluster to Kraft, it failed 
to properly migrate.

We spawn a need cluster fully healthy with 3 Kafka nodes connected to 3 
Zookeeper nodes. 3 new Kafka nodes are there for the new controllers.
It was tested with Kafka 3.6.1, 3.6.2 and 3.7.0.

it might be linked to KAFKA-15330.

The controllers are started without issue. When the brokers are then configured 
for the migration, the migration is not starting. Once the last broker is 
restarted, we got the following logs.
{code:java}
[2024-06-03 15:11:48,192] INFO [ReplicaFetcherThread-0-11]: Stopped 
(kafka.server.ReplicaFetcherThread)
[2024-06-03 15:11:48,193] INFO [ReplicaFetcherThread-0-11]: Shutdown completed 
(kafka.server.ReplicaFetcherThread)
{code}
Then we only get the following every 30s
{code:java}
[2024-06-03 15:12:04,163] INFO [BrokerLifecycleManager id=12 isZkBroker=true] 
Unable to register the broker because the RPC got timed out before it could be 
sent. (kafka.server.BrokerLifecycleManager)
[2024-06-03 15:12:34,297] INFO [BrokerLifecycleManager id=12 isZkBroker=true] 
Unable to register the broker because the RPC got timed out before it could be 
sent. (kafka.server.BrokerLifecycleManager)
[2024-06-03 15:13:04,536] INFO [BrokerLifecycleManager id=12 isZkBroker=true] 
Unable to register the broker because the RPC got timed out before it could be 
sent. (kafka.server.BrokerLifecycleManager){code}

The config on the controller node is the following
{code:java}
kafka0202e1 ~]$  sudo grep -v '^\s*$\|^\s*\#' /etc/kafka/server.properties  | 
grep -v password | sort
advertised.host.name=kafka0202e1.ahub.sb.eu.ginfra.net
broker.rack=e1
controller.listener.names=CONTROLLER
controller.quorum.voters=2...@kafka0202e1.ahub.sb.eu.ginfra.net:9093,2...@kafka0202e2.ahub.sb.eu.ginfra.net:9093,2...@kafka0202e3.ahub.sb.eu.ginfra.net:9093
default.replication.factor=3
delete.topic.enable=false
group.initial.rebalance.delay.ms=3000
inter.broker.protocol.version=3.7
listeners=CONTROLLER://kafka0202e1.ahub.sb.eu.ginfra.net:9093
listener.security.protocol.map=CONTROLLER:SSL,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
log.dirs=/data/kafka
log.message.format.version=3.6
log.retention.check.interval.ms=300000
log.retention.hours=240
log.segment.bytes=1073741824
min.insync.replicas=2
node.id=20
num.io.threads=8
num.network.threads=3
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
process.roles=controller
security.inter.broker.protocol=SSL
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
ssl.cipher.suites=TLS_AES_256_GCM_SHA384
ssl.client.auth=required
ssl.enabled.protocols=TLSv1.3
ssl.endpoint.identification.algorithm=HTTPS
ssl.keystore.location=/etc/kafka/ssl/keystore.ts
ssl.keystore.type=JKS
ssl.secure.random.implementation=SHA1PRNG
ssl.truststore.location=/etc/kafka/ssl/truststore.ts
transaction.state.log.min.isr=3
transaction.state.log.replication.factor=3
unclean.leader.election.enable=false
zookeeper.connect=10.135.65.199:2181,10.133.65.199:2181,10.137.64.56:2181,
zookeeper.metadata.migration.enable=true
 {code}

The config on the broker node is the following
{code}
$ sudo grep -v '^\s*$\|^\s*\#' /etc/kafka/server.properties  | grep -v password 
| sort
advertised.host.name=kafka0201e3.ahub.sb.eu.ginfra.net
advertised.listeners=SSL://kafka0201e3.ahub.sb.eu.ginfra.net:9092
broker.id=12
broker.rack=e3
controller.listener.names=CONTROLLER # added once all controllers were started
controller.quorum.voters=2...@kafka0202e1.ahub.sb.eu.ginfra.net:9093,2...@kafka0202e2.ahub.sb.eu.ginfra.net:9093,2...@kafka0202e3.ahub.sb.eu.ginfra.net:9093
 # added once all controllers were started
default.replication.factor=3
delete.topic.enable=false
group.initial.rebalance.delay.ms=3000
inter.broker.protocol.version=3.7
listener.security.protocol.map=CONTROLLER:SSL,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
listeners=SSL://kafka0201e3.ahub.sb.eu.ginfra.net:9092
log.dirs=/data/kafka
log.retention.check.interval.ms=300000
log.retention.hours=240
log.segment.bytes=1073741824
min.insync.replicas=2
num.io.threads=8
num.network.threads=3
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
security.inter.broker.protocol=SSL
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
ssl.cipher.suites=TLS_AES_256_GCM_SHA384
ssl.client.auth=required
ssl.enabled.protocols=TLSv1.3
ssl.endpoint.identification.algorithm=HTTPS
ssl.keystore.location=/etc/kafka/ssl/keystore.ts
ssl.keystore.type=JKS
ssl.secure.random.implementation=SHA1PRNG
ssl.truststore.location=/etc/kafka/ssl/truststore.ts
transaction.state.log.min.isr=3
transaction.state.log.replication.factor=3
unclean.leader.election.enable=false
zookeeper.connect=10.133.65.199:2181,10.135.65.199:2181,10.137.64.56:2181,
zookeeper.connection.timeout.ms=6000
zookeeper.metadata.migration.enable=true # added once all controllers were 
started
{code}

When trying to move to the next step (`Migrating brokers to KRaft`), it fails 
to get controller quorum and crashes.
{code}
[2024-06-03 15:33:21,553] INFO [BrokerLifecycleManager id=12] Unable to 
register the broker because the RPC got timed out before it could be sent. 
(kafka.server.BrokerLifecycleManager)
[2024-06-03 15:33:32,549] ERROR [BrokerLifecycleManager id=12] Shutting down 
because we were unable to register with the controller quorum. 
(kafka.server.BrokerLifecycleManager)
[2024-06-03 15:33:32,550] INFO [BrokerLifecycleManager id=12] Transitioning 
from STARTING to SHUTTING_DOWN. (kafka.server.BrokerLifecycleManager)
[2024-06-03 15:33:32,551] INFO 
[broker-12-to-controller-heartbeat-channel-manager]: Shutting down 
(kafka.server.NodeToControllerRequestThread)
[2024-06-03 15:33:32,551] INFO 
[broker-12-to-controller-heartbeat-channel-manager]: Shutdown completed 
(kafka.server.NodeToControllerRequestThread)
[2024-06-03 15:33:32,551] ERROR [BrokerServer id=12] Received a fatal error 
while waiting for the controller to acknowledge that we are caught up 
(kafka.server.BrokerServer)
java.util.concurrent.CancellationException
{code}


> Zookeeper-Kraft failing migration - RPC got timed out before it could be sent
> -----------------------------------------------------------------------------
>
>                 Key: KAFKA-19318
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19318
>             Project: Kafka
>          Issue Type: Bug
>          Components: kraft
>    Affects Versions: 3.9.0
>            Reporter: dennis lucero
>            Priority: Major
>
> Despite several attempts to migrate from Zookeeper cluster to Kraft, it 
> failed to properly migrate.
> We spawn a need cluster fully healthy with 18 Kafka nodes connected to 5 
> Zookeeper nodes. 9 new Kafka nodes are there for the new controllers.
> It was tested with Kafka 3.9.0
> The controllers are started with:
> {code:java}
> ==> /var/log/kafka/kafka-server.log <==
> [2025-05-20 15:04:28,798] INFO Registered kafka:type=kafka.Log4jController 
> MBean (kafka.utils.Log4jControllerRegistration$)
> [2025-05-20 15:04:29,207] INFO [ControllerServer id=111] Starting controller 
> (kafka.server.ControllerServer)
> [2025-05-20 15:04:29,656] INFO Updated connection-accept-rate max connection 
> creation rate to 2147483647 (kafka.network.ConnectionQuotas)
> [2025-05-20 15:04:29,784] INFO [SocketServer listenerType=CONTROLLER, 
> nodeId=111] Created data-plane acceptor and processors for endpoint : 
> ListenerName(CONTROLLER) (kafka.network.SocketServer)
> [2025-05-20 15:04:29,792] INFO [SharedServer id=111] Starting SharedServer 
> (kafka.server.SharedServer)
> [2025-05-20 15:04:29,841] INFO [LogLoader partition=__cluster_metadata-0, 
> dir=/data/kafka1/kafka] Recovering unflushed segment 0. 0/1 recovered for 
> __cluster_metadata-0. (kafka.log.LogLoader)
> [2025-05-20 15:04:29,842] INFO [LogLoader partition=__cluster_metadata-0, 
> dir=/data/kafka1/kafka] Loading producer state till offset 0 with message 
> format version 2 (kafka.log.UnifiedLog$)
> [2025-05-20 15:04:29,843] INFO [LogLoader partition=__cluster_metadata-0, 
> dir=/data/kafka1/kafka] Reloading from producer snapshot and rebuilding 
> producer state from offset 0 (kafka.log.UnifiedLog$)
> [2025-05-20 15:04:29,843] INFO [LogLoader partition=__cluster_metadata-0, 
> dir=/data/kafka1/kafka] Producer state recovery took 0ms for snapshot load 
> and 0ms for segment recovery from offset 0 (kafka.log.UnifiedLog$)
> [2025-05-20 15:04:29,905] INFO [LogLoader partition=__cluster_metadata-0, 
> dir=/data/kafka1/kafka] Loading producer state till offset 2709 with message 
> format version 2 (kafka.log.UnifiedLog$)
> [2025-05-20 15:04:29,905] INFO [LogLoader partition=__cluster_metadata-0, 
> dir=/data/kafka1/kafka] Reloading from producer snapshot and rebuilding 
> producer state from offset 2709 (kafka.log.UnifiedLog$)
> [2025-05-20 15:04:29,906] INFO [LogLoader partition=__cluster_metadata-0, 
> dir=/data/kafka1/kafka] Producer state recovery took 1ms for snapshot load 
> and 0ms for segment recovery from offset 2709 (kafka.log.UnifiedLog$)
> [2025-05-20 15:04:29,940] INFO Initialized snapshots with IDs 
> SortedSet(OffsetAndEpoch(offset=0, epoch=0)) from 
> /data/kafka1/kafka/__cluster_metadata-0 (kafka.raft.KafkaMetadataLog$)
> [2025-05-20 15:04:29,952] INFO [raft-expiration-reaper]: Starting 
> (kafka.raft.TimingWheelExpirationService$ExpiredOperationReaper)
> [2025-05-20 15:04:30,191] INFO [ControllerServer id=111] Waiting for 
> controller quorum voters future (kafka.server.ControllerServer)
> [2025-05-20 15:04:30,191] INFO [ControllerServer id=111] Finished waiting for 
> controller quorum voters future (kafka.server.ControllerServer)
> [2025-05-20 15:04:30,233] INFO [ZooKeeperClient KRaft Migration] Initializing 
> a new session to 
> prod-zk-11.ae-rus.net:2181,prod-zk-12.ae-rus.net:2181,prod-zk-13.ae-rus.net:2181,prod-zk-14.ae-rus.net:2181,prod-zk-15.ae-rus.net:2181.
>  (kafka.zookeeper.ZooKeeperClient)
> [2025-05-20 15:04:30,248] INFO [ZooKeeperClient KRaft Migration] Waiting 
> until connected. (kafka.zookeeper.ZooKeeperClient)
> [2025-05-20 15:04:30,261] INFO [ZooKeeperClient KRaft Migration] Connected. 
> (kafka.zookeeper.ZooKeeperClient)
> [2025-05-20 15:04:30,293] INFO [controller-111-ThrottledChannelReaper-Fetch]: 
> Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
> [2025-05-20 15:04:30,293] INFO 
> [controller-111-ThrottledChannelReaper-Produce]: Starting 
> (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
> [2025-05-20 15:04:30,294] INFO 
> [controller-111-ThrottledChannelReaper-Request]: Starting 
> (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
> [2025-05-20 15:04:30,297] INFO 
> [controller-111-ThrottledChannelReaper-ControllerMutation]: Starting 
> (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
> [2025-05-20 15:04:30,316] INFO [ExpirationReaper-111-AlterAcls]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2025-05-20 15:04:30,392] INFO [ControllerServer id=111] Waiting for the 
> controller metadata publishers to be installed (kafka.server.ControllerServer)
> [2025-05-20 15:04:30,393] INFO [ControllerServer id=111] Finished waiting for 
> the controller metadata publishers to be installed 
> (kafka.server.ControllerServer)
> [2025-05-20 15:04:30,393] INFO [SocketServer listenerType=CONTROLLER, 
> nodeId=111] Enabling request processing. (kafka.network.SocketServer)
> [2025-05-20 15:04:30,394] INFO [ControllerRegistrationManager id=111 
> incarnation=mbreItYzTmySXeahJkXxFA] Found registration for 
> UwcgQ_r6T_uBVDMCdWLG6A instead of our incarnation. 
> (kafka.server.ControllerRegistrationManager)
> [2025-05-20 15:04:30,395] INFO Awaiting socket connections on 0.0.0.0:9091. 
> (kafka.network.DataPlaneAcceptor)
> [2025-05-20 15:04:30,455] INFO 
> [controller-111-to-controller-registration-channel-manager]: Starting 
> (kafka.server.NodeToControllerRequestThread)
> [2025-05-20 15:04:30,455] INFO [ControllerRegistrationManager id=111 
> incarnation=mbreItYzTmySXeahJkXxFA] initialized channel manager. 
> (kafka.server.ControllerRegistrationManager)
> [2025-05-20 15:04:30,455] INFO [ControllerServer id=111] Waiting for all of 
> the authorizer futures to be completed (kafka.server.ControllerServer)
> [2025-05-20 15:04:30,456] INFO [ControllerServer id=111] Finished waiting for 
> all of the authorizer futures to be completed (kafka.server.ControllerServer)
> [2025-05-20 15:04:30,456] INFO [ControllerServer id=111] Waiting for all of 
> the SocketServer Acceptors to be started (kafka.server.ControllerServer)
> [2025-05-20 15:04:30,456] INFO [ControllerServer id=111] Finished waiting for 
> all of the SocketServer Acceptors to be started 
> (kafka.server.ControllerServer)
> [2025-05-20 15:04:30,457] INFO [ControllerRegistrationManager id=111 
> incarnation=mbreItYzTmySXeahJkXxFA] sendControllerRegistration: attempting to 
> send ControllerRegistrationRequestData(controllerId=111, 
> incarnationId=mbreItYzTmySXeahJkXxFA, zkMigrationReady=true, 
> listeners=[Listener(name='CONTROLLER', 
> host='prodkafkacontr11z1.h.ae-rus.net', port=9091, securityProtocol=0)], 
> features=[Feature(name='kraft.version', minSupportedVersion=0, 
> maxSupportedVersion=1), Feature(name='metadata.version', 
> minSupportedVersion=1, maxSupportedVersion=21)]) 
> (kafka.server.ControllerRegistrationManager)
> [2025-05-20 15:04:30,471] INFO 
> [controller-111-to-controller-registration-channel-manager]: Recorded new 
> KRaft controller, from now on will use node 
> prodkafkacontr11z1.h.ae-rus.net:9091 (id: 111 rack: null) 
> (kafka.server.NodeToControllerRequestThread)
> [2025-05-20 15:04:30,505] INFO [KafkaRaftServer nodeId=111] Kafka Server 
> started (kafka.server.KafkaRaftServer)
> [2025-05-20 15:04:30,579] INFO [ControllerRegistrationManager id=111 
> incarnation=mbreItYzTmySXeahJkXxFA] Our registration has been persisted to 
> the metadata log. (kafka.server.ControllerRegistrationManager)
> [2025-05-20 15:04:30,581] INFO [ControllerRegistrationManager id=111 
> incarnation=mbreItYzTmySXeahJkXxFA] RegistrationResponseHandler: controller 
> acknowledged ControllerRegistrationRequest. 
> (kafka.server.ControllerRegistrationManager){code}
> {code:java}
> cat /var/log/kafka/controller.log
> [2025-05-20 15:04:30,357] TRACE [KRaftMigrationDriver id=111] Recovered 
> migration state from ZK in 68765354 ns. Transitioned migration state from 
> ZkMigrationLeadershipState{kraftControllerId=-1, kraftControllerEpoch=-1, 
> kraftMetadataOffset=-1, kraftMetadataEpoch=-1, lastUpdatedTimeMs=-1, 
> migrationZkVersion=-2, controllerZkEpoch=-1, controllerZkVersion=-2} to 
> ZkMigrationLeadershipState{kraftControllerId=-1, kraftControllerEpoch=-1, 
> kraftMetadataOffset=-1, kraftMetadataEpoch=-1, 
> lastUpdatedTimeMs=1747648116174, migrationZkVersion=0, controllerZkEpoch=-1, 
> controllerZkVersion=-2} 
> (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
> [2025-05-20 15:04:30,358] INFO [KRaftMigrationDriver id=111] Initial 
> migration of ZK metadata is not done. 
> (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
> [2025-05-20 15:04:30,358] INFO [KRaftMigrationDriver id=111] 111 
> transitioning from UNINITIALIZED to INACTIVE state 
> (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
> [2025-05-20 15:04:30,388] INFO [KRaftMigrationDriver id=111] Became active 
> migration driver in 27621312 ns. Transitioned migration state from 
> ZkMigrationLeadershipState{kraftControllerId=-1, kraftControllerEpoch=-1, 
> kraftMetadataOffset=-1, kraftMetadataEpoch=-1, 
> lastUpdatedTimeMs=1747648116174, migrationZkVersion=0, controllerZkEpoch=-1, 
> controllerZkVersion=-2} to ZkMigrationLeadershipState{kraftControllerId=111, 
> kraftControllerEpoch=2, kraftMetadataOffset=-1, kraftMetadataEpoch=-1, 
> lastUpdatedTimeMs=1747648116174, migrationZkVersion=0, controllerZkEpoch=-1, 
> controllerZkVersion=-2} 
> (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
> [2025-05-20 15:04:30,388] INFO [KRaftMigrationDriver id=111] 111 
> transitioning from INACTIVE to WAIT_FOR_CONTROLLER_QUORUM state 
> (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
> [2025-05-20 15:04:30,388] INFO [KRaftMigrationDriver id=111] Controller 
> Quorum is ready for Zk to KRaft migration. Now waiting for ZK brokers. 
> (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
> [2025-05-20 15:04:30,388] INFO [KRaftMigrationDriver id=111] 111 
> transitioning from WAIT_FOR_CONTROLLER_QUORUM to WAIT_FOR_BROKERS state 
> (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
> [2025-05-20 15:04:30,389] INFO [KRaftMigrationDriver id=111] No brokers are 
> known to KRaft, waiting for brokers to register. 
> (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
> [2025-05-20 15:04:30,494] INFO [KRaftMigrationDriver id=111] No brokers are 
> known to KRaft, waiting for brokers to register. 
> (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
> [2025-05-20 15:04:30,579] TRACE [KRaftMigrationDriver id=111] Received 
> metadata delta, but the controller is not in dual-write mode. Ignoring this 
> metadata update. (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
> [2025-05-20 15:04:30,580] INFO [KRaftMigrationDriver id=111] No brokers are 
> known to KRaft, waiting for brokers to register. 
> (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
> [2025-05-20 15:04:30,645] TRACE [KRaftMigrationDriver id=111] Received 
> metadata delta, but the controller is not in dual-write mode. Ignoring this 
> metadata update. (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
> [2025-05-20 15:04:30,645] INFO [KRaftMigrationDriver id=111] No brokers are 
> known to KRaft, waiting for brokers to register. 
> (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
> [2025-05-20 15:04:30,701] TRACE [KRaftMigrationDriver id=111] Received 
> metadata delta, but the controller is not in dual-write mode. Ignoring this 
> metadata update. (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
> [2025-05-20 15:04:30,701] INFO [KRaftMigrationDriver id=111] No brokers are 
> known to KRaft, waiting for brokers to register. 
> (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
> [2025-05-20 15:04:30,730] TRACE [KRaftMigrationDriver id=111] Received 
> metadata delta, but the controller is not in dual-write mode. Ignoring this 
> metadata update. (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
> [2025-05-20 15:04:30,730] INFO [KRaftMigrationDriver id=111] No brokers are 
> known to KRaft, waiting for brokers to register. 
> (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
> [2025-05-20 15:04:30,778] TRACE [KRaftMigrationDriver id=111] Received 
> metadata delta, but the controller is not in dual-write mode. Ignoring this 
> metadata update. (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
> [2025-05-20 15:04:30,861] TRACE [KRaftMigrationDriver id=111] Received 
> metadata delta, but the controller is not in dual-write mode. Ignoring this 
> metadata update. (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
> [2025-05-20 15:04:30,861] INFO [KRaftMigrationDriver id=111] No brokers are 
> known to KRaft, waiting for brokers to register. 
> (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
> [2025-05-20 15:04:30,905] TRACE [KRaftMigrationDriver id=111] Received 
> metadata delta, but the controller is not in dual-write mode. Ignoring this 
> metadata update. (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
> [2025-05-20 15:04:30,905] INFO [KRaftMigrationDriver id=111] No brokers are 
> known to KRaft, waiting for brokers to register. 
> (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
> [2025-05-20 15:04:31,202] TRACE [KRaftMigrationDriver id=111] Received 
> metadata delta, but the controller is not in dual-write mode. Ignoring this 
> metadata update. (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
> [2025-05-20 15:04:31,202] INFO [KRaftMigrationDriver id=111] No brokers are 
> known to KRaft, waiting for brokers to register. 
> (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
> [2025-05-20 15:04:31,278] TRACE [KRaftMigrationDriver id=111] Received 
> metadata delta, but the controller is not in dual-write mode. Ignoring this 
> metadata update. (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
> [2025-05-20 15:04:31,778] TRACE [KRaftMigrationDriver id=111] Received 
> metadata delta, but the controller is not in dual-write mode. Ignoring this 
> metadata update. (org.apache.kafka.metadata.migration.KRaftMigrationDriver) 
> {code}
> The config on the controller node is the following
> {code:java}
> cat /etc/kafka/server.properties
> ##
> # Ansible managed
> #host.name=prodkafkacontr14z4.h.ae-rus.netcluster.name=MrgOne
> node.id=114
> controller.rack=z4replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelectorprocess.roles=controllerdelete.topic.enable=True
> auto.create.topics.enable=Truelistener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
> listeners=CONTROLLER://:9091advertised.listeners=CONTROLLER://prodkafkacontr14z4.h.ae-rus.net:9091
> controller.listener.names=CONTROLLER
> inter.broker.listener.name=EXTERNALcontroller.quorum.voters=1...@prodkafkacontr11z1.h.ae-rus.net:9091,1...@prodkafkacontr12z1.h.ae-rus.net:9091,1...@prodkafkacontr13z1.h.ae-rus.net:9091,1...@prodkafkacontr17z3.h.ae-rus.net:9091,1...@prodkafkacontr18z3.h.ae-rus.net:9091,1...@prodkafkacontr19z3.h.ae-rus.net:9091,1...@prodkafkacontr14z4.h.ae-rus.net:9091,1...@prodkafkacontr15z4.h.ae-rus.net:9091,1...@prodkafkacontr16z4.h.ae-rus.net:9091controller.quorum.bootstrap.servers=prodkafkacontr11z1.h.ae-rus.net:9091,prodkafkacontr12z1.h.ae-rus.net:9091,prodkafkacontr13z1.h.ae-rus.net:9091,prodkafkacontr17z3.h.ae-rus.net:9091,prodkafkacontr18z3.h.ae-rus.net:9091,prodkafkacontr19z3.h.ae-rus.net:9091,prodkafkacontr14z4.h.ae-rus.net:9091,prodkafkacontr15z4.h.ae-rus.net:9091,prodkafkacontr16z4.h.ae-rus.net:9091
> num.network.threads=128
> background.threads=128num.io.threads=128
> socket.send.buffer.bytes=2097152
> socket.receive.buffer.bytes=2097152
> socket.request.max.bytes=104857600
> queued.max.requests=32768offsets.topic.replication.factor=3
> transaction.state.log.replication.factor=3log.dirs=/data/kafka1/kafka
> num.recovery.threads.per.data.dir=16log.flush.interval.messages=10240
> log.flush.interval.ms=5000log.retention.hours=48
> log.retention.bytes=1073741824
> log.segment.bytes=1073741824
> log.retention.check.interval.ms=300000group.initial.rebalance.delay.ms=1000auto.leader.rebalance.enable=True
> leader.imbalance.check.interval.seconds=300
> leader.imbalance.per.broker.percentage=20default.replication.factor=3
> num.replica.fetchers=64
> replica.fetch.max.bytes=41943040
> replica.fetch.wait.max.ms=1000
> replica.high.watermark.checkpoint.interval.ms=5000
> replica.socket.timeout.ms=30000
> replica.socket.receive.buffer.bytes=65536
> replica.lag.time.max.ms=10000############################# Extra Properties 
> #############################
> offsets.topic.num.partitions=150
> message.max.bytes=20971520
> batch.size=65536
> zookeeper.connect=prod-zk-11.ae-rus.net:2181,prod-zk-12.ae-rus.net:2181,prod-zk-13.ae-rus.net:2181,prod-zk-14.ae-rus.net:2181,prod-zk-15.ae-rus.net:2181
> rack.aware.assignment.strategy=balance_subtopology
> rack.aware.assignment.traffic_cost=org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor
> compression.gzip.level=5
> zookeeper.metadata.migration.enable=True
> zookeeper.metadata.migration.min.batch.size=512
> initial.broker.registration.timeout.ms=18000  {code}
> The config on the broker node is the following
> {code:java}
> $ cat /etc/kafka/server.properties
> ##
> # Ansible managed
> #host.name=prodkafka2z1.h.ae-rus.netcluster.name=MrgOne
> node.id=2
> broker.rack=z1replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelectorprocess.roles=brokerdelete.topic.enable=True
> auto.create.topics.enable=Truelistener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
> listeners=EXTERNAL://:9092,INTERNAL://:29092advertised.listeners=EXTERNAL://prod-kafka-12.ae-rus.net:9092,INTERNAL://prod-kafka-12.ae-rus.net:29092
> controller.listener.names=CONTROLLER
> inter.broker.listener.name=EXTERNALcontroller.quorum.voters=1...@prodkafkacontr11z1.h.ae-rus.net:9091,1...@prodkafkacontr12z1.h.ae-rus.net:9091,1...@prodkafkacontr13z1.h.ae-rus.net:9091,1...@prodkafkacontr17z3.h.ae-rus.net:9091,1...@prodkafkacontr18z3.h.ae-rus.net:9091,1...@prodkafkacontr19z3.h.ae-rus.net:9091,1...@prodkafkacontr14z4.h.ae-rus.net:9091,1...@prodkafkacontr15z4.h.ae-rus.net:9091,1...@prodkafkacontr16z4.h.ae-rus.net:9091controller.quorum.bootstrap.servers=prodkafkacontr11z1.h.ae-rus.net:9091,prodkafkacontr12z1.h.ae-rus.net:9091,prodkafkacontr13z1.h.ae-rus.net:9091,prodkafkacontr17z3.h.ae-rus.net:9091,prodkafkacontr18z3.h.ae-rus.net:9091,prodkafkacontr19z3.h.ae-rus.net:9091,prodkafkacontr14z4.h.ae-rus.net:9091,prodkafkacontr15z4.h.ae-rus.net:9091,prodkafkacontr16z4.h.ae-rus.net:9091
> num.network.threads=128
> background.threads=128num.io.threads=128
> socket.send.buffer.bytes=2097152
> socket.receive.buffer.bytes=2097152
> socket.request.max.bytes=104857600
> queued.max.requests=32768offsets.topic.replication.factor=3
> transaction.state.log.replication.factor=3log.dirs=/data/kafka1/kafka,/data/kafka2/kafka,/data/kafka3/kafka,/data/kafka4/kafka,/data/kafka5/kafka,/data/kafka6/kafka
> num.recovery.threads.per.data.dir=16log.flush.interval.messages=10240
> log.flush.interval.ms=5000log.retention.hours=48
> log.retention.bytes=1073741824
> log.segment.bytes=1073741824
> log.retention.check.interval.ms=300000group.initial.rebalance.delay.ms=1000auto.leader.rebalance.enable=True
> leader.imbalance.check.interval.seconds=300
> leader.imbalance.per.broker.percentage=20default.replication.factor=3
> num.replica.fetchers=64
> replica.fetch.max.bytes=41943040
> replica.fetch.wait.max.ms=1000
> replica.high.watermark.checkpoint.interval.ms=5000
> replica.socket.timeout.ms=30000
> replica.socket.receive.buffer.bytes=65536
> replica.lag.time.max.ms=10000############################# Extra Properties 
> #############################
> offsets.topic.num.partitions=150
> message.max.bytes=20971520
> batch.size=65536
> zookeeper.connect=prod-zk-11.ae-rus.net:2181,prod-zk-12.ae-rus.net:2181,prod-zk-13.ae-rus.net:2181,prod-zk-14.ae-rus.net:2181,prod-zk-15.ae-rus.net:2181
> rack.aware.assignment.strategy=balance_subtopology
> rack.aware.assignment.traffic_cost=org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor
> compression.gzip.level=5
> zookeeper.metadata.migration.enable=True
> zookeeper.metadata.migration.min.batch.size=512
> initial.broker.registration.timeout.ms=18000
> {code}
> When trying to move to the next step (`Migrating brokers to KRaft`), it fails 
> to get controller quorum and crashes.
> {code:java}
> ==> /var/log/kafka/kafka-server.log <==
> [2025-05-20 15:24:07,521] INFO Registered kafka:type=kafka.Log4jController 
> MBean (kafka.utils.Log4jControllerRegistration$)
> [2025-05-20 15:24:07,826] INFO [BrokerServer id=2] Transition from SHUTDOWN 
> to STARTING (kafka.server.BrokerServer)
> [2025-05-20 15:24:07,827] INFO [SharedServer id=2] Starting SharedServer 
> (kafka.server.SharedServer)
> [2025-05-20 15:24:07,884] INFO [LogLoader partition=__cluster_metadata-0, 
> dir=/data/kafka1/kafka] Recovering unflushed segment 21124. 0/1 recovered for 
> __cluster_metadata-0. (kafka.log.LogLoader)
> [2025-05-20 15:24:07,885] INFO [LogLoader partition=__cluster_metadata-0, 
> dir=/data/kafka1/kafka] Loading producer state till offset 21124 with message 
> format version 2 (kafka.log.UnifiedLog$)
> [2025-05-20 15:24:07,886] INFO [LogLoader partition=__cluster_metadata-0, 
> dir=/data/kafka1/kafka] Reloading from producer snapshot and rebuilding 
> producer state from offset 21124 (kafka.log.UnifiedLog$)
> [2025-05-20 15:24:07,893] INFO [LogLoader partition=__cluster_metadata-0, 
> dir=/data/kafka1/kafka] Producer state recovery took 0ms for snapshot load 
> and 7ms for segment recovery from offset 21124 (kafka.log.UnifiedLog$)
> [2025-05-20 15:24:07,958] INFO [LogLoader partition=__cluster_metadata-0, 
> dir=/data/kafka1/kafka] Loading producer state till offset 31424 with message 
> format version 2 (kafka.log.UnifiedLog$)
> [2025-05-20 15:24:07,958] INFO [LogLoader partition=__cluster_metadata-0, 
> dir=/data/kafka1/kafka] Reloading from producer snapshot and rebuilding 
> producer state from offset 31424 (kafka.log.UnifiedLog$)
> [2025-05-20 15:24:07,960] INFO [LogLoader partition=__cluster_metadata-0, 
> dir=/data/kafka1/kafka] Producer state recovery took 2ms for snapshot load 
> and 0ms for segment recovery from offset 31424 (kafka.log.UnifiedLog$)
> [2025-05-20 15:24:07,995] INFO Initialized snapshots with IDs 
> SortedSet(OffsetAndEpoch(offset=21124, epoch=7)) from 
> /data/kafka1/kafka/__cluster_metadata-0 (kafka.raft.KafkaMetadataLog$)
> [2025-05-20 15:24:08,023] INFO [raft-expiration-reaper]: Starting 
> (kafka.raft.TimingWheelExpirationService$ExpiredOperationReaper)
> [2025-05-20 15:24:08,301] INFO [BrokerServer id=2] Starting broker 
> (kafka.server.BrokerServer)
> [2025-05-20 15:24:08,318] INFO [broker-2-ThrottledChannelReaper-Fetch]: 
> Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
> [2025-05-20 15:24:08,319] INFO [broker-2-ThrottledChannelReaper-Produce]: 
> Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
> [2025-05-20 15:24:08,320] INFO [broker-2-ThrottledChannelReaper-Request]: 
> Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
> [2025-05-20 15:24:08,322] INFO 
> [broker-2-ThrottledChannelReaper-ControllerMutation]: Starting 
> (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
> [2025-05-20 15:24:08,365] INFO [BrokerServer id=2] Waiting for controller 
> quorum voters future (kafka.server.BrokerServer)
> [2025-05-20 15:24:08,365] INFO [BrokerServer id=2] Finished waiting for 
> controller quorum voters future (kafka.server.BrokerServer)
> [2025-05-20 15:24:08,371] INFO 
> [broker-2-to-controller-forwarding-channel-manager]: Starting 
> (kafka.server.NodeToControllerRequestThread)
> [2025-05-20 15:24:08,592] INFO Updated connection-accept-rate max connection 
> creation rate to 2147483647 (kafka.network.ConnectionQuotas)
> [2025-05-20 15:24:08,687] INFO [SocketServer listenerType=BROKER, nodeId=2] 
> Created data-plane acceptor and processors for endpoint : 
> ListenerName(EXTERNAL) (kafka.network.SocketServer)
> [2025-05-20 15:24:08,688] INFO Updated connection-accept-rate max connection 
> creation rate to 2147483647 (kafka.network.ConnectionQuotas)
> [2025-05-20 15:24:08,759] INFO [SocketServer listenerType=BROKER, nodeId=2] 
> Created data-plane acceptor and processors for endpoint : 
> ListenerName(INTERNAL) (kafka.network.SocketServer)
> [2025-05-20 15:24:08,765] INFO 
> [broker-2-to-controller-alter-partition-channel-manager]: Starting 
> (kafka.server.NodeToControllerRequestThread)
> [2025-05-20 15:24:08,771] INFO 
> [broker-2-to-controller-directory-assignments-channel-manager]: Starting 
> (kafka.server.NodeToControllerRequestThread)
> [2025-05-20 15:24:08,788] INFO [ExpirationReaper-2-Produce]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2025-05-20 15:24:08,788] INFO [ExpirationReaper-2-Fetch]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2025-05-20 15:24:08,789] INFO [ExpirationReaper-2-DeleteRecords]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2025-05-20 15:24:08,790] INFO [ExpirationReaper-2-ElectLeader]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2025-05-20 15:24:08,790] INFO [ExpirationReaper-2-RemoteFetch]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2025-05-20 15:24:08,808] INFO [ExpirationReaper-2-Heartbeat]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2025-05-20 15:24:08,808] INFO [ExpirationReaper-2-Rebalance]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2025-05-20 15:24:08,863] INFO 
> [broker-2-to-controller-heartbeat-channel-manager]: Starting 
> (kafka.server.NodeToControllerRequestThread)
> [2025-05-20 15:24:08,865] INFO [BrokerLifecycleManager id=2] Incarnation 
> ZvarJwVqQZOUzzobd91HlQ of broker 2 in cluster 9ca1s2tbTGm1Bi35aynf6Q is now 
> STARTING. (kafka.server.BrokerLifecycleManager)
> [2025-05-20 15:24:08,898] INFO [ExpirationReaper-2-AlterAcls]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2025-05-20 15:24:08,978] INFO [BrokerServer id=2] Waiting for the broker 
> metadata publishers to be installed (kafka.server.BrokerServer)
> [2025-05-20 15:24:08,979] INFO [BrokerServer id=2] Finished waiting for the 
> broker metadata publishers to be installed (kafka.server.BrokerServer)
> [2025-05-20 15:24:08,979] INFO [BrokerServer id=2] Waiting for the controller 
> to acknowledge that we are caught up (kafka.server.BrokerServer)
> {code}
> {code:java}
> ==> /var/log/kafka/kafka-server.log <==
> [2025-05-20 15:24:13,380] INFO [BrokerLifecycleManager id=2] Unable to 
> register the broker because the RPC got timed out before it could be sent. 
> (kafka.server.BrokerLifecycleManager)
> [2025-05-20 15:24:17,990] INFO [BrokerLifecycleManager id=2] Unable to 
> register the broker because the RPC got timed out before it could be sent. 
> (kafka.server.BrokerLifecycleManager)
> [2025-05-20 15:24:22,701] INFO [BrokerLifecycleManager id=2] Unable to 
> register the broker because the RPC got timed out before it could be sent. 
> (kafka.server.BrokerLifecycleManager)
> [2025-05-20 15:24:26,864] ERROR [BrokerLifecycleManager id=2] Shutting down 
> because we were unable to register with the controller quorum. 
> (kafka.server.BrokerLifecycleManager)
> [2025-05-20 15:24:26,864] INFO [BrokerLifecycleManager id=2] Transitioning 
> from STARTING to SHUTTING_DOWN. (kafka.server.BrokerLifecycleManager)
> [2025-05-20 15:24:26,865] INFO 
> [broker-2-to-controller-heartbeat-channel-manager]: Shutting down 
> (kafka.server.NodeToControllerRequestThread)
> [2025-05-20 15:24:26,865] INFO 
> [broker-2-to-controller-heartbeat-channel-manager]: Stopped 
> (kafka.server.NodeToControllerRequestThread)
> [2025-05-20 15:24:26,865] INFO 
> [broker-2-to-controller-heartbeat-channel-manager]: Shutdown completed 
> (kafka.server.NodeToControllerRequestThread)
> [2025-05-20 15:24:26,865] ERROR [BrokerServer id=2] Received a fatal error 
> while waiting for the controller to acknowledge that we are caught up 
> (kafka.server.BrokerServer)
> java.util.concurrent.CancellationException
>     at 
> java.base/java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2510)
>     at 
> kafka.server.BrokerLifecycleManager$ShutdownEvent.run(BrokerLifecycleManager.scala:638)
>     at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:191)
>     at java.base/java.lang.Thread.run(Thread.java:1583)
> [2025-05-20 15:24:26,867] INFO Node to controller channel manager for 
> heartbeat shutdown (kafka.server.NodeToControllerChannelManagerImpl)
> [2025-05-20 15:24:26,868] INFO [BrokerServer id=2] Transition from STARTING 
> to STARTED (kafka.server.BrokerServer)
> [2025-05-20 15:24:26,870] ERROR [BrokerServer id=2] Fatal error during broker 
> startup. Prepare to shutdown (kafka.server.BrokerServer)
> java.lang.RuntimeException: Received a fatal error while waiting for the 
> controller to acknowledge that we are caught up
>     at 
> org.apache.kafka.server.util.FutureUtils.waitWithLogging(FutureUtils.java:73)
>     at kafka.server.BrokerServer.startup(BrokerServer.scala:502)
>     at 
> kafka.server.KafkaRaftServer.$anonfun$startup$2(KafkaRaftServer.scala:99)
>     at 
> kafka.server.KafkaRaftServer.$anonfun$startup$2$adapted(KafkaRaftServer.scala:99)
>     at scala.Option.foreach(Option.scala:437)
>     at kafka.server.KafkaRaftServer.startup(KafkaRaftServer.scala:99)
>     at kafka.Kafka$.main(Kafka.scala:112)
>     at kafka.Kafka.main(Kafka.scala)
> Caused by: java.util.concurrent.CancellationException
>     at 
> java.base/java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2510)
>     at 
> kafka.server.BrokerLifecycleManager$ShutdownEvent.run(BrokerLifecycleManager.scala:638)
>     at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:191)
>     at java.base/java.lang.Thread.run(Thread.java:1583)
> [2025-05-20 15:24:26,870] INFO [BrokerServer id=2] Transition from STARTED to 
> SHUTTING_DOWN (kafka.server.BrokerServer)
> [2025-05-20 15:24:26,871] INFO [BrokerServer id=2] shutting down 
> (kafka.server.BrokerServer)
> [2025-05-20 15:24:26,871] INFO [SocketServer listenerType=BROKER, nodeId=2] 
> Stopping socket server request processors (kafka.network.SocketServer)
> [2025-05-20 15:24:26,874] ERROR Cannot invoke 
> "java.nio.channels.ServerSocketChannel.close()" because the return value of 
> "kafka.network.Acceptor.serverChannel()" is null 
> (kafka.network.DataPlaneAcceptor)
> java.lang.NullPointerException: Cannot invoke 
> "java.nio.channels.ServerSocketChannel.close()" because the return value of 
> "kafka.network.Acceptor.serverChannel()" is null
>     at kafka.network.Acceptor.$anonfun$closeAll$2(SocketServer.scala:711)
>     at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:68)
>     at kafka.network.Acceptor.closeAll(SocketServer.scala:711)
>     at kafka.network.Acceptor.close(SocketServer.scala:678)
>     at 
> kafka.network.SocketServer.$anonfun$stopProcessingRequests$4(SocketServer.scala:287)
>     at 
> kafka.network.SocketServer.$anonfun$stopProcessingRequests$4$adapted(SocketServer.scala:287)
>     at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:619)
>     at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:617)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:935)
>     at 
> kafka.network.SocketServer.stopProcessingRequests(SocketServer.scala:287)
>     at kafka.server.BrokerServer.$anonfun$shutdown$5(BrokerServer.scala:638)
>     at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:68)
>     at kafka.server.BrokerServer.shutdown(BrokerServer.scala:638)
>     at kafka.server.KafkaBroker.shutdown(KafkaBroker.scala:98)
>     at kafka.server.KafkaBroker.shutdown$(KafkaBroker.scala:98)
>     at kafka.server.BrokerServer.shutdown(BrokerServer.scala:67)
>     at kafka.server.BrokerServer.startup(BrokerServer.scala:555)
>     at 
> kafka.server.KafkaRaftServer.$anonfun$startup$2(KafkaRaftServer.scala:99)
>     at 
> kafka.server.KafkaRaftServer.$anonfun$startup$2$adapted(KafkaRaftServer.scala:99)
>     at scala.Option.foreach(Option.scala:437)
>     at kafka.server.KafkaRaftServer.startup(KafkaRaftServer.scala:99)
>     at kafka.Kafka$.main(Kafka.scala:112)
>     at kafka.Kafka.main(Kafka.scala)
> [2025-05-20 15:24:26,906] ERROR Cannot invoke 
> "java.nio.channels.ServerSocketChannel.close()" because the return value of 
> "kafka.network.Acceptor.serverChannel()" is null 
> (kafka.network.DataPlaneAcceptor)
> java.lang.NullPointerException: Cannot invoke 
> "java.nio.channels.ServerSocketChannel.close()" because the return value of 
> "kafka.network.Acceptor.serverChannel()" is null
>     at kafka.network.Acceptor.$anonfun$closeAll$2(SocketServer.scala:711)
>     at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:68)
>     at kafka.network.Acceptor.closeAll(SocketServer.scala:711)
>     at kafka.network.Acceptor.close(SocketServer.scala:678)
>     at 
> kafka.network.SocketServer.$anonfun$stopProcessingRequests$4(SocketServer.scala:287)
>     at 
> kafka.network.SocketServer.$anonfun$stopProcessingRequests$4$adapted(SocketServer.scala:287)
>     at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:619)
>     at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:617)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:935)
>     at 
> kafka.network.SocketServer.stopProcessingRequests(SocketServer.scala:287)
>     at kafka.server.BrokerServer.$anonfun$shutdown$5(BrokerServer.scala:638)
>     at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:68)
>     at kafka.server.BrokerServer.shutdown(BrokerServer.scala:638)
>     at kafka.server.KafkaBroker.shutdown(KafkaBroker.scala:98)
>     at kafka.server.KafkaBroker.shutdown$(KafkaBroker.scala:98)
>     at kafka.server.BrokerServer.shutdown(BrokerServer.scala:67)
>     at kafka.server.BrokerServer.startup(BrokerServer.scala:555)
>     at 
> kafka.server.KafkaRaftServer.$anonfun$startup$2(KafkaRaftServer.scala:99)
>     at 
> kafka.server.KafkaRaftServer.$anonfun$startup$2$adapted(KafkaRaftServer.scala:99)
>     at scala.Option.foreach(Option.scala:437)
>     at kafka.server.KafkaRaftServer.startup(KafkaRaftServer.scala:99)
>     at kafka.Kafka$.main(Kafka.scala:112)
>     at kafka.Kafka.main(Kafka.scala)
> [2025-05-20 15:24:26,932] INFO [SocketServer listenerType=BROKER, nodeId=2] 
> Stopped socket server request processors (kafka.network.SocketServer) {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to