[ https://issues.apache.org/jira/browse/KAFKA-16790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17847824#comment-17847824 ]
Christo Lolov commented on KAFKA-16790: --------------------------------------- Heya [~muralibasani], thank you for picking this up on such a short notice! Here's my reasoning. The real initialisation of a RemoteLogManager happens when its startup method is called (that is what in turn calls the configure methods on the RemoteStorageManager and the RemoteLogMetadataManager). You are correct that the createRemoteLogManager method is called earlier than the creation/running of the BrokerMetadataPublisher. However, that doesn't configure the underlying managers. Let me know if there is still confusion on what the problem is! > Calls to RemoteLogManager are made before it is configured > ---------------------------------------------------------- > > Key: KAFKA-16790 > URL: https://issues.apache.org/jira/browse/KAFKA-16790 > Project: Kafka > Issue Type: Bug > Components: kraft > Affects Versions: 3.8.0 > Reporter: Christo Lolov > Priority: Major > > BrokerMetadataPublisher#onMetadataUpdate calls ReplicaManager#applyDelta (1) > which in turn calls RemoteLogManager#onLeadershipChange (2), however, the > RemoteLogManager is configured after the BrokerMetadataPublisher starts > running (3, 4). This is incorrect, we either need to initialise the > RemoteLogManager before we start the BrokerMetadataPublisher or we need to > skip calls to onLeadershipChange if the RemoteLogManager is not initialised. > (1) > [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala#L151] > (2) > [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L2737] > (3) > [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L432] > (4) > [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L515] > The way to reproduce the problem is by looking at the following changes > {code:java} > config/kraft/broker.properties | 10 ++++++++++ > .../main/java/kafka/log/remote/RemoteLogManager.java | 8 +++++++- > core/src/main/scala/kafka/server/ReplicaManager.scala | 6 +++++- > 3 files changed, 22 insertions(+), 2 deletions(-)diff --git > a/config/kraft/broker.properties b/config/kraft/broker.properties > index 2d15997f28..39d126cf87 100644 > --- a/config/kraft/broker.properties > +++ b/config/kraft/broker.properties > @@ -127,3 +127,13 @@ log.segment.bytes=1073741824 > # The interval at which log segments are checked to see if they can be > deleted according > # to the retention policies > log.retention.check.interval.ms=300000 > + > +remote.log.storage.system.enable=true > +remote.log.metadata.manager.listener.name=PLAINTEXT > +remote.log.storage.manager.class.name=org.apache.kafka.server.log.remote.storage.LocalTieredStorage > +remote.log.storage.manager.class.path=/home/ec2-user/kafka/storage/build/libs/kafka-storage-3.8.0-SNAPSHOT-test.jar > +remote.log.storage.manager.impl.prefix=rsm.config. > +remote.log.metadata.manager.impl.prefix=rlmm.config. > +rsm.config.dir=/tmp/kafka-remote-storage > +rlmm.config.remote.log.metadata.topic.replication.factor=1 > +log.retention.check.interval.ms=1000 > diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java > b/core/src/main/java/kafka/log/remote/RemoteLogManager.java > index 6555b7c0cd..e84a072abc 100644 > --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java > +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java > @@ -164,6 +164,7 @@ public class RemoteLogManager implements Closeable { > // The endpoint for remote log metadata manager to connect to > private Optional<EndPoint> endpoint = Optional.empty(); > private boolean closed = false; > + private boolean up = false; > > /** > * Creates RemoteLogManager instance with the given arguments. > @@ -298,6 +299,7 @@ public class RemoteLogManager implements Closeable { > // in connecting to the brokers or remote storages. > configureRSM(); > configureRLMM(); > + up = true; > } > > public RemoteStorageManager storageManager() { > @@ -329,7 +331,11 @@ public class RemoteLogManager implements Closeable { > public void onLeadershipChange(Set<Partition> partitionsBecomeLeader, > Set<Partition> partitionsBecomeFollower, > Map<String, Uuid> topicIds) { > - LOGGER.debug("Received leadership changes for leaders: {} and > followers: {}", partitionsBecomeLeader, partitionsBecomeFollower); > + if (!up) { > + LOGGER.error("NullPointerException"); > + return; > + } > + LOGGER.error("Received leadership changes for leaders: {} and > followers: {}", partitionsBecomeLeader, partitionsBecomeFollower); > > Map<TopicIdPartition, Integer> leaderPartitionsWithLeaderEpoch = > filterPartitions(partitionsBecomeLeader) > .collect(Collectors.toMap( > diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala > b/core/src/main/scala/kafka/server/ReplicaManager.scala > index 35499430d6..bd3f41c3d6 100644 > --- a/core/src/main/scala/kafka/server/ReplicaManager.scala > +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala > @@ -2688,6 +2688,7 @@ class ReplicaManager(val config: KafkaConfig, > */ > def applyDelta(delta: TopicsDelta, newImage: MetadataImage): Unit = { > // Before taking the lock, compute the local changes > + stateChangeLogger.error("ROBIN") > val localChanges = delta.localChanges(config.nodeId) > val metadataVersion = newImage.features().metadataVersion() > > @@ -2734,7 +2735,10 @@ class ReplicaManager(val config: KafkaConfig, > replicaFetcherManager.shutdownIdleFetcherThreads() > replicaAlterLogDirsManager.shutdownIdleFetcherThreads() > > - remoteLogManager.foreach(rlm => > rlm.onLeadershipChange(leaderChangedPartitions.asJava, > followerChangedPartitions.asJava, localChanges.topicIds())) > + remoteLogManager.foreach(rlm => { > + stateChangeLogger.error("JOKER") > + rlm.onLeadershipChange(leaderChangedPartitions.asJava, > followerChangedPartitions.asJava, localChanges.topicIds()) > + }) > } > > if (metadataVersion.isDirectoryAssignmentSupported) { {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)