[ 
https://issues.apache.org/jira/browse/KAFKA-16790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Christo Lolov reassigned KAFKA-16790:
-------------------------------------

    Assignee: Muralidhar Basani

> Calls to RemoteLogManager are made before it is configured
> ----------------------------------------------------------
>
>                 Key: KAFKA-16790
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16790
>             Project: Kafka
>          Issue Type: Bug
>          Components: kraft
>    Affects Versions: 3.8.0
>            Reporter: Christo Lolov
>            Assignee: Muralidhar Basani
>            Priority: Major
>
> BrokerMetadataPublisher#onMetadataUpdate calls ReplicaManager#applyDelta (1) 
> which in turn calls RemoteLogManager#onLeadershipChange (2), however, the 
> RemoteLogManager is configured after the BrokerMetadataPublisher starts 
> running (3, 4). This is incorrect, we either need to initialise the 
> RemoteLogManager before we start the BrokerMetadataPublisher or we need to 
> skip calls to onLeadershipChange if the RemoteLogManager is not initialised.
> (1) 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala#L151]
> (2) 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L2737]
> (3) 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L432]
> (4) 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L515]
> The way to reproduce the problem is by looking at the following changes
> {code:java}
>  config/kraft/broker.properties                         | 10 ++++++++++
>  .../main/java/kafka/log/remote/RemoteLogManager.java   |  8 +++++++-
>  core/src/main/scala/kafka/server/ReplicaManager.scala  |  6 +++++-
>  3 files changed, 22 insertions(+), 2 deletions(-)diff --git 
> a/config/kraft/broker.properties b/config/kraft/broker.properties
> index 2d15997f28..39d126cf87 100644
> --- a/config/kraft/broker.properties
> +++ b/config/kraft/broker.properties
> @@ -127,3 +127,13 @@ log.segment.bytes=1073741824
>  # The interval at which log segments are checked to see if they can be 
> deleted according
>  # to the retention policies
>  log.retention.check.interval.ms=300000
> +
> +remote.log.storage.system.enable=true
> +remote.log.metadata.manager.listener.name=PLAINTEXT
> +remote.log.storage.manager.class.name=org.apache.kafka.server.log.remote.storage.LocalTieredStorage
> +remote.log.storage.manager.class.path=/home/ec2-user/kafka/storage/build/libs/kafka-storage-3.8.0-SNAPSHOT-test.jar
> +remote.log.storage.manager.impl.prefix=rsm.config.
> +remote.log.metadata.manager.impl.prefix=rlmm.config.
> +rsm.config.dir=/tmp/kafka-remote-storage
> +rlmm.config.remote.log.metadata.topic.replication.factor=1
> +log.retention.check.interval.ms=1000
> diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java 
> b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
> index 6555b7c0cd..e84a072abc 100644
> --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
> +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
> @@ -164,6 +164,7 @@ public class RemoteLogManager implements Closeable {
>      // The endpoint for remote log metadata manager to connect to
>      private Optional<EndPoint> endpoint = Optional.empty();
>      private boolean closed = false;
> +    private boolean up = false;
>  
>      /**
>       * Creates RemoteLogManager instance with the given arguments.
> @@ -298,6 +299,7 @@ public class RemoteLogManager implements Closeable {
>          // in connecting to the brokers or remote storages.
>          configureRSM();
>          configureRLMM();
> +        up = true;
>      }
>  
>      public RemoteStorageManager storageManager() {
> @@ -329,7 +331,11 @@ public class RemoteLogManager implements Closeable {
>      public void onLeadershipChange(Set<Partition> partitionsBecomeLeader,
>                                     Set<Partition> partitionsBecomeFollower,
>                                     Map<String, Uuid> topicIds) {
> -        LOGGER.debug("Received leadership changes for leaders: {} and 
> followers: {}", partitionsBecomeLeader, partitionsBecomeFollower);
> +        if (!up) {
> +            LOGGER.error("NullPointerException");
> +            return;
> +        }
> +        LOGGER.error("Received leadership changes for leaders: {} and 
> followers: {}", partitionsBecomeLeader, partitionsBecomeFollower);
>  
>          Map<TopicIdPartition, Integer> leaderPartitionsWithLeaderEpoch = 
> filterPartitions(partitionsBecomeLeader)
>                  .collect(Collectors.toMap(
> diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
> b/core/src/main/scala/kafka/server/ReplicaManager.scala
> index 35499430d6..bd3f41c3d6 100644
> --- a/core/src/main/scala/kafka/server/ReplicaManager.scala
> +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
> @@ -2688,6 +2688,7 @@ class ReplicaManager(val config: KafkaConfig,
>     */
>    def applyDelta(delta: TopicsDelta, newImage: MetadataImage): Unit = {
>      // Before taking the lock, compute the local changes
> +    stateChangeLogger.error("ROBIN")
>      val localChanges = delta.localChanges(config.nodeId)
>      val metadataVersion = newImage.features().metadataVersion()
>  
> @@ -2734,7 +2735,10 @@ class ReplicaManager(val config: KafkaConfig,
>          replicaFetcherManager.shutdownIdleFetcherThreads()
>          replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
>  
> -        remoteLogManager.foreach(rlm => 
> rlm.onLeadershipChange(leaderChangedPartitions.asJava, 
> followerChangedPartitions.asJava, localChanges.topicIds()))
> +        remoteLogManager.foreach(rlm => {
> +          stateChangeLogger.error("JOKER")
> +          rlm.onLeadershipChange(leaderChangedPartitions.asJava, 
> followerChangedPartitions.asJava, localChanges.topicIds())
> +        })
>        }
>  
>        if (metadataVersion.isDirectoryAssignmentSupported) { {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to