This is an automated email from the ASF dual-hosted git repository.

rndgstn pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 07490b929bd KAFKA-15365: Broker-side replica management changes 
(#14881)
07490b929bd is described below

commit 07490b929bdf923195632bed7b494aa7d22d9f31
Author: Omnia Ibrahim <o.g.h.ibra...@gmail.com>
AuthorDate: Mon Dec 11 14:34:22 2023 +0000

    KAFKA-15365: Broker-side replica management changes (#14881)
    
    Reviewers: Igor Soarez <soa...@apple.com>, Ron Dagostino 
<rndg...@gmail.com>, Proven Provenzano <pprovenz...@confluent.io>
---
 core/src/main/scala/kafka/cluster/Partition.scala  |  40 ++--
 core/src/main/scala/kafka/log/LogManager.scala     |  24 ++-
 .../src/main/scala/kafka/server/BrokerServer.scala |   2 +-
 core/src/main/scala/kafka/server/KafkaServer.scala |   2 +-
 .../main/scala/kafka/server/ReplicaManager.scala   |  58 +++---
 .../unit/kafka/cluster/PartitionLockTest.scala     |   4 +-
 .../scala/unit/kafka/cluster/PartitionTest.scala   |  10 +-
 .../test/scala/unit/kafka/log/LogManagerTest.scala |  76 +++++--
 .../unit/kafka/server/ReplicaManagerTest.scala     | 221 +++++++++++++++++----
 .../unit/kafka/server/ServerStartupTest.scala      |   2 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |   7 +-
 .../jmh/fetcher/ReplicaFetcherThreadBenchmark.java |   2 +-
 .../partition/PartitionMakeFollowerBenchmark.java  |   4 +-
 .../UpdateFollowerFetchStateBenchmark.java         |   2 +-
 .../apache/kafka/jmh/server/CheckpointBench.java   |   2 +-
 .../kafka/jmh/server/PartitionCreationBench.java   |   2 +-
 .../apache/kafka/image/LocalReplicaChanges.java    |  10 +-
 .../java/org/apache/kafka/image/TopicDelta.java    |  20 +-
 .../java/org/apache/kafka/image/TopicsDelta.java   |   5 +-
 19 files changed, 361 insertions(+), 132 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index b3e7fecef8a..094e3cda419 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -30,7 +30,7 @@ import kafka.server.metadata.{KRaftMetadataCache, 
ZkMetadataCache}
 import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
 import kafka.utils._
 import kafka.zookeeper.ZooKeeperClientException
-import org.apache.kafka.common.TopicIdPartition
+import org.apache.kafka.common.{DirectoryId, IsolationLevel, TopicIdPartition, 
TopicPartition, Uuid}
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.message.AlterPartitionRequestData.BrokerState
 import org.apache.kafka.common.message.{DescribeProducersResponseData, 
FetchResponseData}
@@ -42,7 +42,6 @@ import org.apache.kafka.common.record.{MemoryRecords, 
RecordBatch}
 import org.apache.kafka.common.requests._
 import 
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH,
 UNDEFINED_EPOCH_OFFSET}
 import org.apache.kafka.common.utils.Time
-import org.apache.kafka.common.{IsolationLevel, TopicPartition, Uuid}
 import org.apache.kafka.metadata.LeaderRecoveryState
 import org.apache.kafka.server.common.MetadataVersion
 import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, 
FetchIsolation, FetchParams, LeaderHwChange, LogAppendInfo, LogOffsetMetadata, 
LogOffsetSnapshot, LogOffsetsListener, LogReadInfo, 
LogStartOffsetIncrementReason, VerificationGuard}
@@ -85,7 +84,6 @@ trait AlterPartitionListener {
   def markIsrExpand(): Unit
   def markIsrShrink(): Unit
   def markFailed(): Unit
-  def assignDir(dir: String): Unit
 }
 
 class DelayedOperations(topicPartition: TopicPartition,
@@ -120,10 +118,6 @@ object Partition {
       }
 
       override def markFailed(): Unit = 
replicaManager.failedIsrUpdatesRate.mark()
-
-      override def assignDir(dir: String): Unit = {
-        replicaManager.maybeNotifyPartitionAssignedToDirectory(topicPartition, 
dir)
-      }
     }
 
     val delayedOperations = new DelayedOperations(
@@ -447,7 +441,8 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
-  def createLogIfNotExists(isNew: Boolean, isFutureReplica: Boolean, 
offsetCheckpoints: OffsetCheckpoints, topicId: Option[Uuid]): Unit = {
+  def createLogIfNotExists(isNew: Boolean, isFutureReplica: Boolean, 
offsetCheckpoints: OffsetCheckpoints, topicId: Option[Uuid],
+                           targetLogDirectoryId: Option[Uuid] = None): Unit = {
     def maybeCreate(logOpt: Option[UnifiedLog]): UnifiedLog = {
       logOpt match {
         case Some(log) =>
@@ -456,7 +451,7 @@ class Partition(val topicPartition: TopicPartition,
             topicId.foreach(log.assignTopicId)
           log
         case None =>
-          createLog(isNew, isFutureReplica, offsetCheckpoints, topicId)
+          createLog(isNew, isFutureReplica, offsetCheckpoints, topicId, 
targetLogDirectoryId)
       }
     }
 
@@ -468,7 +463,8 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   // Visible for testing
-  private[cluster] def createLog(isNew: Boolean, isFutureReplica: Boolean, 
offsetCheckpoints: OffsetCheckpoints, topicId: Option[Uuid]): UnifiedLog = {
+  private[cluster] def createLog(isNew: Boolean, isFutureReplica: Boolean, 
offsetCheckpoints: OffsetCheckpoints,
+                                 topicId: Option[Uuid], targetLogDirectoryId: 
Option[Uuid]): UnifiedLog = {
     def updateHighWatermark(log: UnifiedLog): Unit = {
       val checkpointHighWatermark = offsetCheckpoints.fetch(log.parentDir, 
topicPartition).getOrElse {
         info(s"No checkpointed highwatermark is found for partition 
$topicPartition")
@@ -481,11 +477,10 @@ class Partition(val topicPartition: TopicPartition,
     logManager.initializingLog(topicPartition)
     var maybeLog: Option[UnifiedLog] = None
     try {
-      val log = logManager.getOrCreateLog(topicPartition, isNew, 
isFutureReplica, topicId)
+      val log = logManager.getOrCreateLog(topicPartition, isNew, 
isFutureReplica, topicId, targetLogDirectoryId)
       if (!isFutureReplica) log.setLogOffsetsListener(logOffsetsListener)
       maybeLog = Some(log)
       updateHighWatermark(log)
-      alterPartitionListener.assignDir(log.parentDir)
       log
     } finally {
       logManager.finishedInitializingLog(topicPartition, maybeLog)
@@ -658,7 +653,6 @@ class Partition(val topicPartition: TopicPartition,
     // need to hold the lock to prevent appendMessagesToLeader() from hitting 
I/O exceptions due to log being deleted
     inWriteLock(leaderIsrUpdateLock) {
       clear()
-
       listeners.forEach { listener =>
         listener.onDeleted(topicPartition)
       }
@@ -703,7 +697,8 @@ class Partition(val topicPartition: TopicPartition,
    */
   def makeLeader(partitionState: LeaderAndIsrPartitionState,
                  highWatermarkCheckpoints: OffsetCheckpoints,
-                 topicId: Option[Uuid]): Boolean = {
+                 topicId: Option[Uuid],
+                 targetDirectoryId: Option[Uuid] = None): Boolean = {
     val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) {
       // Partition state changes are expected to have an partition epoch 
larger or equal
       // to the current partition epoch. The latter is allowed because the 
partition epoch
@@ -745,7 +740,7 @@ class Partition(val topicPartition: TopicPartition,
       )
 
       try {
-        createLogIfNotExists(partitionState.isNew, isFutureReplica = false, 
highWatermarkCheckpoints, topicId)
+        createLogInAssignedDirectoryId(partitionState, 
highWatermarkCheckpoints, topicId, targetDirectoryId)
       } catch {
         case e: ZooKeeperClientException =>
           stateChangeLogger.error(s"A ZooKeeper client exception has occurred 
and makeLeader will be skipping the " +
@@ -819,7 +814,8 @@ class Partition(val topicPartition: TopicPartition,
    */
   def makeFollower(partitionState: LeaderAndIsrPartitionState,
                    highWatermarkCheckpoints: OffsetCheckpoints,
-                   topicId: Option[Uuid]): Boolean = {
+                   topicId: Option[Uuid],
+                   targetLogDirectoryId: Option[Uuid] = None): Boolean = {
     inWriteLock(leaderIsrUpdateLock) {
       if (partitionState.partitionEpoch < partitionEpoch) {
         stateChangeLogger.info(s"Skipped the become-follower state change for 
$topicPartition with topic id $topicId " +
@@ -849,7 +845,7 @@ class Partition(val topicPartition: TopicPartition,
       )
 
       try {
-        createLogIfNotExists(partitionState.isNew, isFutureReplica = false, 
highWatermarkCheckpoints, topicId)
+        createLogInAssignedDirectoryId(partitionState, 
highWatermarkCheckpoints, topicId, targetLogDirectoryId)
       } catch {
         case e: ZooKeeperClientException =>
           stateChangeLogger.error(s"A ZooKeeper client exception has occurred. 
makeFollower will be skipping the " +
@@ -875,6 +871,16 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
+  private def createLogInAssignedDirectoryId(partitionState: 
LeaderAndIsrPartitionState, highWatermarkCheckpoints: OffsetCheckpoints, 
topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): Unit = {
+    targetLogDirectoryId match {
+      case Some(directoryId) =>
+        createLogIfNotExists(directoryId == DirectoryId.UNASSIGNED, 
isFutureReplica = false, highWatermarkCheckpoints, topicId, 
targetLogDirectoryId)
+
+      case None =>
+        createLogIfNotExists(partitionState.isNew, isFutureReplica = false, 
highWatermarkCheckpoints, topicId)
+    }
+  }
+
   /**
    * Update the follower's state in the leader based on the last fetch 
request. See
    * [[Replica.updateFetchStateOrThrow()]] for details.
diff --git a/core/src/main/scala/kafka/log/LogManager.scala 
b/core/src/main/scala/kafka/log/LogManager.scala
index 4f316e1f960..f84177d9717 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -25,7 +25,7 @@ import kafka.server.checkpoints.OffsetCheckpointFile
 import kafka.server.metadata.ConfigRepository
 import kafka.server._
 import kafka.utils._
-import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
+import org.apache.kafka.common.{DirectoryId, KafkaException, TopicPartition, 
Uuid}
 import org.apache.kafka.common.utils.{KafkaThread, Time, Utils}
 import org.apache.kafka.common.errors.{InconsistentTopicIdException, 
KafkaStorageException, LogDirNotFoundException}
 
@@ -123,7 +123,9 @@ class LogManager(logDirs: Seq[File],
   }
 
   private val dirLocks = lockLogDirs(liveLogDirs)
-  val directoryIds: Map[String, Uuid] = loadDirectoryIds(liveLogDirs)
+  private val directoryIds: mutable.Map[String, Uuid] = 
loadDirectoryIds(liveLogDirs)
+  def directoryIdsSet: Predef.Set[Uuid] = directoryIds.values.toSet
+
   @volatile private var recoveryPointCheckpoints = liveLogDirs.map(dir =>
     (dir, new OffsetCheckpointFile(new File(dir, RecoveryPointCheckpointFile), 
logDirFailureChannel))).toMap
   @volatile private var logStartOffsetCheckpoints = liveLogDirs.map(dir =>
@@ -216,6 +218,7 @@ class LogManager(logDirs: Seq[File],
     warn(s"Stopping serving logs in dir $dir")
     logCreationOrDeletionLock synchronized {
       _liveLogDirs.remove(new File(dir))
+      directoryIds.remove(dir)
       if (_liveLogDirs.isEmpty) {
         fatal(s"Shutdown broker because all log dirs in ${logDirs.mkString(", 
")} have failed")
         Exit.halt(1)
@@ -278,7 +281,7 @@ class LogManager(logDirs: Seq[File],
    * If meta.properties does not include a directory ID, one is generated and 
persisted back to meta.properties.
    * Directories without a meta.properties don't get a directory ID assigned.
    */
-  private def loadDirectoryIds(logDirs: Seq[File]): Map[String, Uuid] = {
+  private def loadDirectoryIds(logDirs: Seq[File]): mutable.Map[String, Uuid] 
= {
     val result = mutable.HashMap[String, Uuid]()
     logDirs.foreach(logDir => {
       try {
@@ -991,10 +994,14 @@ class LogManager(logDirs: Seq[File],
    * @param isNew Whether the replica should have existed on the broker or not
    * @param isFuture True if the future log of the specified partition should 
be returned or created
    * @param topicId The topic ID of the partition's topic
+   * @param targetLogDirectoryId The directory Id that should host the the 
partition's topic.
+   *                             The next selected directory will be picked up 
if it None or equal {@link DirectoryId.UNASSIGNED}.
+   *                             The method assumes provided Id belong to 
online directory.
    * @throws KafkaStorageException if isNew=false, log is not found in the 
cache and there is offline log directory on the broker
    * @throws InconsistentTopicIdException if the topic ID in the log does not 
match the topic ID provided
    */
-  def getOrCreateLog(topicPartition: TopicPartition, isNew: Boolean = false, 
isFuture: Boolean = false, topicId: Option[Uuid]): UnifiedLog = {
+  def getOrCreateLog(topicPartition: TopicPartition, isNew: Boolean = false, 
isFuture: Boolean = false,
+                     topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid] 
= Option.empty): UnifiedLog = {
     logCreationOrDeletionLock synchronized {
       val log = getLog(topicPartition, isFuture).getOrElse {
         // create the log if it has not already been created in another thread
@@ -1002,7 +1009,14 @@ class LogManager(logDirs: Seq[File],
           throw new KafkaStorageException(s"Can not create log for 
$topicPartition because log directories ${offlineLogDirs.mkString(",")} are 
offline")
 
         val logDirs: List[File] = {
-          val preferredLogDir = preferredLogDirs.get(topicPartition)
+          val preferredLogDir = 
targetLogDirectoryId.filterNot(Seq(DirectoryId.UNASSIGNED,DirectoryId.LOST).contains)
 match {
+            case Some(targetId) if 
!preferredLogDirs.containsKey(topicPartition) =>
+              // If partition is configured with both targetLogDirectoryId and 
preferredLogDirs, then
+              // preferredLogDirs will be respected, otherwise 
targetLogDirectoryId will be respected
+              directoryIds.find(_._2 == targetId).map(_._1).getOrElse(null)
+            case _ =>
+              preferredLogDirs.get(topicPartition)
+          }
 
           if (isFuture) {
             if (preferredLogDir == null)
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index fccceadbcec..6701481a7c6 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -210,7 +210,7 @@ class BrokerServer(
         time,
         s"broker-${config.nodeId}-",
         isZkBroker = false,
-        logDirs = logManager.directoryIds.values.toSet)
+        logDirs = logManager.directoryIdsSet)
 
       // Enable delegation token cache for all SCRAM mechanisms to simplify 
dynamic update.
       // This keeps the cache up-to-date if new SCRAM mechanisms are enabled 
dynamically.
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 1699842d10f..1d3275632f0 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -417,7 +417,7 @@ class KafkaServer(
             time,
             s"zk-broker-${config.nodeId}-",
             isZkBroker = true,
-            logManager.directoryIds.values.toSet)
+            logManager.directoryIdsSet)
 
           // If the ZK broker is in migration mode, start up a RaftManager to 
learn about the new KRaft controller
           val controllerQuorumVotersFuture = CompletableFuture.completedFuture(
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index b0e31fbb415..60f69caad4e 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -27,7 +27,7 @@ import kafka.server.QuotaFactory.QuotaManagers
 import kafka.server.ReplicaManager.{AtMinIsrPartitionCountMetricName, 
FailedIsrUpdatesPerSecMetricName, IsrExpandsPerSecMetricName, 
IsrShrinksPerSecMetricName, LeaderCountMetricName, 
OfflineReplicaCountMetricName, PartitionCountMetricName, 
PartitionsWithLateTransactionsCountMetricName, ProducerIdCountMetricName, 
ReassigningPartitionsMetricName, UnderMinIsrPartitionCountMetricName, 
UnderReplicatedPartitionsMetricName}
 import kafka.server.ReplicaManager.createLogReadResult
 import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpointFile, 
OffsetCheckpoints}
-import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache}
+import kafka.server.metadata.ZkMetadataCache
 import kafka.utils.Implicits._
 import kafka.utils._
 import kafka.zk.KafkaZkClient
@@ -272,7 +272,7 @@ class ReplicaManager(val config: KafkaConfig,
                      threadNamePrefix: Option[String] = None,
                      val brokerEpochSupplier: () => Long = () => -1,
                      addPartitionsToTxnManager: 
Option[AddPartitionsToTxnManager] = None,
-                     directoryEventHandler: DirectoryEventHandler = 
DirectoryEventHandler.NOOP
+                     val directoryEventHandler: DirectoryEventHandler = 
DirectoryEventHandler.NOOP
                      ) extends Logging {
   private val metricsGroup = new KafkaMetricsGroup(this.getClass)
 
@@ -2352,32 +2352,6 @@ class ReplicaManager(val config: KafkaConfig,
     warn(s"Stopped serving replicas in dir $dir")
   }
 
-  /**
-   * Called when a topic partition is placed in a log directory.
-   * If a directory event listener is configured,
-   * and if the selected log directory has an assigned Uuid,
-   * then the listener is notified of the assignment.
-   */
-  def maybeNotifyPartitionAssignedToDirectory(tp: TopicPartition, dir: 
String): Unit = {
-    if (metadataCache.isInstanceOf[KRaftMetadataCache]) {
-      logManager.directoryId(dir) match {
-        case None => throw new IllegalStateException(s"Assignment into 
unidentified directory: ${dir}")
-        case Some(dirId) =>
-          getPartition(tp) match {
-            case HostedPartition.Offline | HostedPartition.None =>
-              throw new IllegalStateException("Assignment for a partition that 
is not online")
-            case HostedPartition.Online(partition) => partition.topicId match {
-              case None =>
-                throw new IllegalStateException(s"Assignment for topic without 
ID: ${tp.topic()}")
-              case Some(topicId) =>
-                val topicIdPartition = new common.TopicIdPartition(topicId, 
tp.partition())
-                directoryEventHandler.handleAssignment(topicIdPartition, 
dirId, () => ())
-            }
-          }
-        }
-    }
-  }
-
   def removeMetrics(): Unit = {
     ReplicaManager.MetricNames.foreach(metricsGroup.removeMetric)
   }
@@ -2598,10 +2572,10 @@ class ReplicaManager(val config: KafkaConfig,
         val leaderChangedPartitions = new mutable.HashSet[Partition]
         val followerChangedPartitions = new mutable.HashSet[Partition]
         if (!localChanges.leaders.isEmpty) {
-          applyLocalLeadersDelta(leaderChangedPartitions, delta, 
lazyOffsetCheckpoints, localChanges.leaders.asScala)
+          applyLocalLeadersDelta(leaderChangedPartitions, newImage, delta, 
lazyOffsetCheckpoints, localChanges.leaders.asScala, 
localChanges.directoryIds.asScala)
         }
         if (!localChanges.followers.isEmpty) {
-          applyLocalFollowersDelta(followerChangedPartitions, newImage, delta, 
lazyOffsetCheckpoints, localChanges.followers.asScala)
+          applyLocalFollowersDelta(followerChangedPartitions, newImage, delta, 
lazyOffsetCheckpoints, localChanges.followers.asScala, 
localChanges.directoryIds.asScala)
         }
 
         maybeAddLogDirFetchers(leaderChangedPartitions ++ 
followerChangedPartitions, lazyOffsetCheckpoints,
@@ -2612,14 +2586,18 @@ class ReplicaManager(val config: KafkaConfig,
 
         remoteLogManager.foreach(rlm => 
rlm.onLeadershipChange(leaderChangedPartitions.asJava, 
followerChangedPartitions.asJava, localChanges.topicIds()))
       }
+
+      localChanges.directoryIds.forEach(maybeUpdateTopicAssignment)
     }
   }
 
   private def applyLocalLeadersDelta(
     changedPartitions: mutable.Set[Partition],
+    newImage: MetadataImage,
     delta: TopicsDelta,
     offsetCheckpoints: OffsetCheckpoints,
-    localLeaders: mutable.Map[TopicPartition, 
LocalReplicaChanges.PartitionInfo]
+    localLeaders: mutable.Map[TopicPartition, 
LocalReplicaChanges.PartitionInfo],
+    directoryIds: mutable.Map[TopicIdPartition, Uuid]
   ): Unit = {
     stateChangeLogger.info(s"Transitioning ${localLeaders.size} partition(s) 
to " +
       "local leaders.")
@@ -2628,7 +2606,9 @@ class ReplicaManager(val config: KafkaConfig,
       getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition, 
isNew) =>
         try {
           val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew)
-          partition.makeLeader(state, offsetCheckpoints, Some(info.topicId))
+          val partitionAssignedDirectoryId = 
directoryIds.find(_._1.topicPartition() == tp).map(_._2)
+          partition.makeLeader(state, offsetCheckpoints, Some(info.topicId), 
partitionAssignedDirectoryId)
+
           changedPartitions.add(partition)
         } catch {
           case e: KafkaStorageException =>
@@ -2649,7 +2629,8 @@ class ReplicaManager(val config: KafkaConfig,
     newImage: MetadataImage,
     delta: TopicsDelta,
     offsetCheckpoints: OffsetCheckpoints,
-    localFollowers: mutable.Map[TopicPartition, 
LocalReplicaChanges.PartitionInfo]
+    localFollowers: mutable.Map[TopicPartition, 
LocalReplicaChanges.PartitionInfo],
+    directoryIds: mutable.Map[TopicIdPartition, Uuid]
   ): Unit = {
     stateChangeLogger.info(s"Transitioning ${localFollowers.size} partition(s) 
to " +
       "local followers.")
@@ -2667,7 +2648,8 @@ class ReplicaManager(val config: KafkaConfig,
           //   is unavailable. This is required to ensure that we include the 
partition's
           //   high watermark in the checkpoint file (see KAFKA-1647).
           val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew)
-          val isNewLeaderEpoch = partition.makeFollower(state, 
offsetCheckpoints, Some(info.topicId))
+          val partitionAssignedDirectoryId = 
directoryIds.find(_._1.topicPartition() == tp).map(_._2)
+          val isNewLeaderEpoch = partition.makeFollower(state, 
offsetCheckpoints, Some(info.topicId), partitionAssignedDirectoryId)
 
           if (isInControlledShutdown && (info.partition.leader == NO_LEADER ||
               !info.partition.isr.contains(config.brokerId))) {
@@ -2742,6 +2724,14 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
+  private def maybeUpdateTopicAssignment(partition: TopicIdPartition, 
partitionDirectoryId: Uuid): Unit = {
+    for {
+      topicPartitionActualLog <- logManager.getLog(partition.topicPartition(), 
false)
+      topicPartitionActualDirectoryId <- 
logManager.directoryId(topicPartitionActualLog.dir.getParent)
+      if partitionDirectoryId != topicPartitionActualDirectoryId
+    } directoryEventHandler.handleAssignment(new 
common.TopicIdPartition(partition.topicId, partition.partition()), 
topicPartitionActualDirectoryId, () => ())
+  }
+
   def deleteStrayReplicas(topicPartitions: Iterable[TopicPartition]): Unit = {
     stopPartitions(topicPartitions.map(tp => tp -> true).toMap).forKeyValue { 
(topicPartition, exception) =>
       exception match {
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala 
b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
index 445371c108b..054d1690d80 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
@@ -293,8 +293,8 @@ class PartitionLockTest extends Logging {
         }
       }
 
-      override def createLog(isNew: Boolean, isFutureReplica: Boolean, 
offsetCheckpoints: OffsetCheckpoints, topicId: Option[Uuid]): UnifiedLog = {
-        val log = super.createLog(isNew, isFutureReplica, offsetCheckpoints, 
None)
+      override def createLog(isNew: Boolean, isFutureReplica: Boolean, 
offsetCheckpoints: OffsetCheckpoints, topicId: Option[Uuid], 
targetLogDirectoryId: Option[Uuid]): UnifiedLog = {
+        val log = super.createLog(isNew, isFutureReplica, offsetCheckpoints, 
None, None)
         val logDirFailureChannel = new LogDirFailureChannel(1)
         val segments = new LogSegments(log.topicPartition)
         val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(log.dir, 
log.topicPartition, logDirFailureChannel, log.config.recordVersion, "")
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala 
b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index b908fcc5e59..9180147618f 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -427,8 +427,8 @@ class PartitionTest extends AbstractPartitionTest {
       logManager,
       alterPartitionManager) {
 
-      override def createLog(isNew: Boolean, isFutureReplica: Boolean, 
offsetCheckpoints: OffsetCheckpoints, topicId: Option[Uuid]): UnifiedLog = {
-        val log = super.createLog(isNew, isFutureReplica, offsetCheckpoints, 
None)
+      override def createLog(isNew: Boolean, isFutureReplica: Boolean, 
offsetCheckpoints: OffsetCheckpoints, topicId: Option[Uuid], 
targetLogDirectoryId: Option[Uuid]): UnifiedLog = {
+        val log = super.createLog(isNew, isFutureReplica, offsetCheckpoints, 
None, None)
         val logDirFailureChannel = new LogDirFailureChannel(1)
         val segments = new LogSegments(log.topicPartition)
         val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(log.dir, 
log.topicPartition, logDirFailureChannel, log.config.recordVersion, "")
@@ -2994,7 +2994,7 @@ class PartitionTest extends AbstractPartitionTest {
       spyLogManager,
       alterPartitionManager)
 
-    partition.createLog(isNew = true, isFutureReplica = false, 
offsetCheckpoints, topicId = None)
+    partition.createLog(isNew = true, isFutureReplica = false, 
offsetCheckpoints, topicId = None, targetLogDirectoryId = None)
 
     // Validate that initializingLog and finishedInitializingLog was called
     verify(spyLogManager).initializingLog(ArgumentMatchers.eq(topicPartition))
@@ -3033,7 +3033,7 @@ class PartitionTest extends AbstractPartitionTest {
       spyLogManager,
       alterPartitionManager)
 
-    partition.createLog(isNew = true, isFutureReplica = false, 
offsetCheckpoints, topicId = None)
+    partition.createLog(isNew = true, isFutureReplica = false, 
offsetCheckpoints, topicId = None, targetLogDirectoryId = None)
 
     // Validate that initializingLog and finishedInitializingLog was called
     verify(spyLogManager).initializingLog(ArgumentMatchers.eq(topicPartition))
@@ -3075,7 +3075,7 @@ class PartitionTest extends AbstractPartitionTest {
       spyLogManager,
       alterPartitionManager)
 
-    partition.createLog(isNew = true, isFutureReplica = false, 
offsetCheckpoints, topicId = None)
+    partition.createLog(isNew = true, isFutureReplica = false, 
offsetCheckpoints, topicId = None, targetLogDirectoryId = None)
 
     // Validate that initializingLog and finishedInitializingLog was called
     verify(spyLogManager).initializingLog(ArgumentMatchers.eq(topicPartition))
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index e14e2eb8e07..b2a16b5bfa6 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -26,7 +26,7 @@ import org.apache.directory.api.util.FileUtils
 import org.apache.kafka.common.config.TopicConfig
 import org.apache.kafka.common.errors.OffsetOutOfRangeException
 import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
+import org.apache.kafka.common.{DirectoryId, KafkaException, TopicPartition, 
Uuid}
 import org.apache.kafka.metadata.properties.{MetaProperties, 
MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@@ -93,6 +93,54 @@ class LogManagerTest {
     log.appendAsLeader(TestUtils.singletonRecords("test".getBytes()), 
leaderEpoch = 0)
   }
 
+  /**
+   * Test that getOrCreateLog on a non-existent log creates a new log in given 
logDirectory using directory id and that we can append to the new log.
+   */
+  @Test
+  def testCreateLogOnTargetedLogDirectory(): Unit = {
+    val targetedLogDirectoryId = DirectoryId.random()
+
+    val dirs: Seq[File] = Seq.fill(5)(TestUtils.tempDir())
+    writeMetaProperties(dirs(0))
+    writeMetaProperties(dirs(1), Optional.of(targetedLogDirectoryId))
+    writeMetaProperties(dirs(3), Optional.of(DirectoryId.random()))
+    writeMetaProperties(dirs(4))
+
+    logManager = createLogManager(dirs)
+
+    val log = logManager.getOrCreateLog(new TopicPartition(name, 0), topicId = 
None, targetLogDirectoryId = Some(targetedLogDirectoryId))
+    assertEquals(5, logManager.liveLogDirs.size)
+
+    val logFile = new File(dirs(1), name + "-0")
+    assertTrue(logFile.exists)
+    assertEquals(dirs(1).getAbsolutePath, logFile.getParent)
+    log.appendAsLeader(TestUtils.singletonRecords("test".getBytes()), 
leaderEpoch = 0)
+  }
+
+  /**
+   * Test that getOrCreateLog on a non-existent log creates a new log in the 
next selected logDirectory if the given directory id is DirectoryId.UNASSIGNED.
+   */
+  @Test
+  def testCreateLogWithTargetedLogDirectorySetAsUnassigned(): Unit = {
+    val log = logManager.getOrCreateLog(new TopicPartition(name, 0), topicId = 
None, targetLogDirectoryId = Some(DirectoryId.UNASSIGNED))
+    assertEquals(1, logManager.liveLogDirs.size)
+    val logFile = new File(logDir, name + "-0")
+    assertTrue(logFile.exists)
+    
assertFalse(logManager.directoryId(logFile.getParent).equals(DirectoryId.UNASSIGNED))
+    log.appendAsLeader(TestUtils.singletonRecords("test".getBytes()), 
leaderEpoch = 0)
+  }
+
+  @Test
+  def 
testCreateLogWithTargetedLogDirectorySetAsUnknownWithoutAnyOfflineDirectories():
 Unit = {
+
+    val log = logManager.getOrCreateLog(new TopicPartition(name, 0), topicId = 
None, targetLogDirectoryId = Some(DirectoryId.LOST))
+    assertEquals(1, logManager.liveLogDirs.size)
+    val logFile = new File(logDir, name + "-0")
+    assertTrue(logFile.exists)
+    
assertFalse(logManager.directoryId(logFile.getParent).equals(DirectoryId.random()))
+    log.appendAsLeader(TestUtils.singletonRecords("test".getBytes()), 
leaderEpoch = 0)
+  }
+
   /**
    * Tests that all internal futures are completed before 
LogManager.shutdown() returns to the
    * caller during error situations.
@@ -1044,19 +1092,6 @@ class LogManagerTest {
 
   @Test
   def testLoadDirectoryIds(): Unit = {
-    def writeMetaProperties(
-      dir: File,
-      directoryId: Optional[Uuid] = Optional.empty()
-    ): Unit = {
-      val metaProps = new MetaProperties.Builder().
-        setVersion(MetaPropertiesVersion.V0).
-        setClusterId("IVT1Seu3QjacxS7oBTKhDQ").
-        setNodeId(1).
-        setDirectoryId(directoryId).
-        build()
-      PropertiesUtils.writePropertiesFile(metaProps.toProperties,
-        new File(dir, 
MetaPropertiesEnsemble.META_PROPERTIES_NAME).getAbsolutePath, false)
-    }
     val dirs: Seq[File] = Seq.fill(5)(TestUtils.tempDir())
     writeMetaProperties(dirs(0))
     writeMetaProperties(dirs(1), 
Optional.of(Uuid.fromString("ZwkGXjB0TvSF6mjVh6gO7Q")))
@@ -1072,6 +1107,17 @@ class LogManagerTest {
     assertEquals(None, logManager.directoryId(dirs(2).getAbsolutePath))
     assertEquals(Some(Uuid.fromString("kQfNPJ2FTHq_6Qlyyv6Jqg")), 
logManager.directoryId(dirs(3).getAbsolutePath))
     assertTrue(logManager.directoryId(dirs(3).getAbsolutePath).isDefined)
-    assertEquals(2, logManager.directoryIds.size)
+    assertEquals(2, logManager.directoryIdsSet.size)
+  }
+
+  def writeMetaProperties(dir: File, directoryId: Optional[Uuid] = 
Optional.empty()): Unit = {
+    val metaProps = new MetaProperties.Builder().
+      setVersion(MetaPropertiesVersion.V0).
+      setClusterId("IVT1Seu3QjacxS7oBTKhDQ").
+      setNodeId(1).
+      setDirectoryId(directoryId).
+      build()
+    PropertiesUtils.writePropertiesFile(metaProps.toProperties,
+      new File(dir, 
MetaPropertiesEnsemble.META_PROPERTIES_NAME).getAbsolutePath, false)
   }
 }
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index a4ececf2197..e40fd90f318 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -52,11 +52,11 @@ import 
org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.{LogContext, Time, Utils}
-import org.apache.kafka.common.{IsolationLevel, Node, TopicIdPartition, 
TopicPartition, Uuid}
+import org.apache.kafka.common.{DirectoryId, IsolationLevel, Node, 
TopicIdPartition, TopicPartition, Uuid}
 import org.apache.kafka.image._
 import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
 import org.apache.kafka.metadata.LeaderRecoveryState
-import org.apache.kafka.server.common.OffsetAndEpoch
+import org.apache.kafka.server.common.{DirectoryEventHandler, OffsetAndEpoch}
 import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
 import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
 import org.apache.kafka.server.util.{MockScheduler, MockTime}
@@ -68,6 +68,7 @@ import org.junit.jupiter.params.provider.{EnumSource, 
ValueSource}
 import com.yammer.metrics.core.Gauge
 import kafka.log.remote.RemoteLogManager
 import org.apache.kafka.common.config.{AbstractConfig, TopicConfig}
+import org.apache.kafka.metadata.properties.{MetaProperties, 
MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
 import org.apache.kafka.raft.RaftConfig
 import 
org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, 
NoOpRemoteStorageManager, RemoteLogManagerConfig}
 import org.apache.kafka.server.util.timer.MockTimer
@@ -76,7 +77,7 @@ import org.mockito.invocation.InvocationOnMock
 import org.mockito.stubbing.Answer
 import org.mockito.{ArgumentCaptor, ArgumentMatchers}
 import org.mockito.ArgumentMatchers.{any, anyInt, anyLong, anyMap, anySet, 
anyString}
-import org.mockito.Mockito.{doAnswer, doReturn, mock, mockConstruction, never, 
reset, spy, times, verify, verifyNoMoreInteractions, when}
+import org.mockito.Mockito.{atLeastOnce, doAnswer, doReturn, mock, 
mockConstruction, never, reset, spy, times, verify, verifyNoInteractions, 
verifyNoMoreInteractions, when}
 
 import scala.collection.{Map, Seq, mutable}
 import scala.compat.java8.OptionConverters.RichOptionForJava8
@@ -2779,7 +2780,7 @@ class ReplicaManagerTest {
     val topicPartitionObj = new TopicPartition(topic, topicPartition)
     val mockLogMgr: LogManager = mock(classOf[LogManager])
     when(mockLogMgr.liveLogDirs).thenReturn(config.logDirs.map(new 
File(_).getAbsoluteFile))
-    when(mockLogMgr.getOrCreateLog(ArgumentMatchers.eq(topicPartitionObj), 
ArgumentMatchers.eq(false), ArgumentMatchers.eq(false), 
any())).thenReturn(mockLog)
+    when(mockLogMgr.getOrCreateLog(ArgumentMatchers.eq(topicPartitionObj), 
ArgumentMatchers.eq(false), ArgumentMatchers.eq(false), any(), 
any())).thenReturn(mockLog)
     when(mockLogMgr.getLog(topicPartitionObj, isFuture = 
false)).thenReturn(Some(mockLog))
     when(mockLogMgr.getLog(topicPartitionObj, isFuture = 
true)).thenReturn(None)
     val allLogs = new Pool[TopicPartition, UnifiedLog]()
@@ -3119,7 +3120,9 @@ class ReplicaManagerTest {
     enableRemoteStorage: Boolean = false,
     shouldMockLog: Boolean = false,
     remoteLogManager: Option[RemoteLogManager] = None,
-    defaultTopicRemoteLogStorageEnable: Boolean = true
+    defaultTopicRemoteLogStorageEnable: Boolean = true,
+    setupLogDirMetaProperties: Boolean = false,
+    directoryEventHandler: DirectoryEventHandler = DirectoryEventHandler.NOOP
   ): ReplicaManager = {
     val props = TestUtils.createBrokerConfig(brokerId, TestUtils.MockZkConnect)
     val path1 = TestUtils.tempRelativeDir("data").getAbsolutePath
@@ -3132,6 +3135,19 @@ class ReplicaManagerTest {
       logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
     }
     val mockLog = setupMockLog(path1)
+    if (setupLogDirMetaProperties) {
+      // add meta.properties file in each dir
+      config.logDirs.foreach(dir => {
+        val metaProps = new MetaProperties.Builder().
+          setVersion(MetaPropertiesVersion.V0).
+          setClusterId("clusterId").
+          setNodeId(brokerId).
+          setDirectoryId(DirectoryId.random()).
+          build()
+        PropertiesUtils.writePropertiesFile(metaProps.toProperties,
+          new File(new File(dir), 
MetaPropertiesEnsemble.META_PROPERTIES_NAME).getAbsolutePath, false)
+      })
+    }
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new 
File(_)), new LogConfig(logProps), log = if (shouldMockLog) Some(mockLog) else 
None, remoteStorageSystemEnable = enableRemoteStorage)
     val aliveBrokers = aliveBrokerIds.map(brokerId => new Node(brokerId, 
s"host$brokerId", brokerId))
 
@@ -3175,6 +3191,7 @@ class ReplicaManagerTest {
       delayedRemoteFetchPurgatoryParam = Some(mockDelayedRemoteFetchPurgatory),
       threadNamePrefix = Option(this.getClass.getName),
       addPartitionsToTxnManager = Some(addPartitionsToTxnManager),
+      directoryEventHandler = directoryEventHandler,
       remoteLogManager = if (enableRemoteStorage) {
         if (remoteLogManager.isDefined)
           remoteLogManager
@@ -4408,6 +4425,138 @@ class ReplicaManagerTest {
     assertEquals(followerPartitions, actualFollowerPartitions)
   }
 
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def 
testApplyDeltaShouldHandleReplicaAssignedToOnlineDirectory(enableRemoteStorage: 
Boolean): Unit = {
+    val localId = 1
+    val topicPartition0 = new TopicPartition("foo", 0)
+    val topicPartition1 = new TopicPartition("foo", 1)
+    val directoryEventHandler = mock(classOf[DirectoryEventHandler])
+
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), localId,
+      enableRemoteStorage = enableRemoteStorage, setupLogDirMetaProperties = 
true, directoryEventHandler = directoryEventHandler)
+
+    try {
+
+      // Test applying delta as leader
+      val directoryIds = replicaManager.logManager.directoryIdsSet.toList
+      // Make the local replica the leader
+      val leaderTopicsDelta = topicsCreateDelta(localId, true, partition = 0, 
directoryIds = directoryIds)
+      val leaderMetadataImage = imageFromTopics(leaderTopicsDelta.apply())
+      replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage)
+
+      // Check the broker shouldn't updated the controller with the correct 
assignment.
+      verifyNoInteractions(replicaManager.directoryEventHandler)
+      val logDirIdHostingPartition0 = 
replicaManager.logManager.directoryId(replicaManager.logManager.getLog(topicPartition0).get.dir.getParent).get
+      assertEquals(directoryIds.head, logDirIdHostingPartition0)
+
+      // Test applying delta as follower
+      val followerTopicsDelta = topicsCreateDelta(localId, false, partition = 
1, directoryIds = directoryIds)
+      val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
+
+      replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
+
+      // Check the broker shouldn't updated the controller with the correct 
assignment.
+      verifyNoInteractions(replicaManager.directoryEventHandler)
+      val logDirIdHostingPartition1 = 
replicaManager.logManager.directoryId(replicaManager.logManager.getLog(topicPartition1).get.dir.getParent).get
+      assertEquals(directoryIds.head, logDirIdHostingPartition1)
+
+    } finally {
+      replicaManager.shutdown(checkpointHW = false)
+    }
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def 
testApplyDeltaShouldHandleReplicaAssignedToUnassignedDirectory(enableRemoteStorage:
 Boolean): Unit = {
+    val localId = 1
+    val topicPartition0 = new TopicPartition("foo", 0)
+    val topicPartition1 = new TopicPartition("foo", 1)
+    val directoryEventHandler = mock(classOf[DirectoryEventHandler])
+
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), localId,
+      enableRemoteStorage = enableRemoteStorage, setupLogDirMetaProperties = 
true, directoryEventHandler = directoryEventHandler)
+
+    try {
+      // Make the local replica the leader
+      val leaderTopicsDelta = topicsCreateDelta(localId, true, partition = 0, 
directoryIds = List(DirectoryId.UNASSIGNED, DirectoryId.UNASSIGNED))
+      val leaderMetadataImage = imageFromTopics(leaderTopicsDelta.apply())
+      val topicId = leaderMetadataImage.topics().topicsByName.get("foo").id
+      val topicIdPartition0 = new TopicIdPartition(topicId, topicPartition0)
+      val topicIdPartition1 = new TopicIdPartition(topicId, topicPartition1)
+      replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage)
+
+      // Make the local replica the as follower
+      val followerTopicsDelta = topicsCreateDelta(localId, false, partition = 
1, directoryIds = List(DirectoryId.UNASSIGNED, DirectoryId.UNASSIGNED))
+      val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
+      replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
+
+      // Check the broker updated the controller with the correct assignment.
+      val topicIdPartitionCapture: 
ArgumentCaptor[org.apache.kafka.server.common.TopicIdPartition] =
+        
ArgumentCaptor.forClass(classOf[org.apache.kafka.server.common.TopicIdPartition])
+      val logIdCaptureForPartition: ArgumentCaptor[Uuid] = 
ArgumentCaptor.forClass(classOf[Uuid])
+      verify(replicaManager.directoryEventHandler, 
atLeastOnce()).handleAssignment(topicIdPartitionCapture.capture(), 
logIdCaptureForPartition.capture(), any())
+
+      assertEquals(topicIdPartitionCapture.getAllValues.asScala,
+        List(
+          new org.apache.kafka.server.common.TopicIdPartition(topicId, 
topicIdPartition0.partition()),
+          new org.apache.kafka.server.common.TopicIdPartition(topicId, 
topicIdPartition1.partition())
+        )
+      )
+      val logDirIdHostingPartition0 = 
replicaManager.logManager.directoryId(replicaManager.logManager.getLog(topicPartition0).get.dir.getParent).get
+      val logDirIdHostingPartition1 = 
replicaManager.logManager.directoryId(replicaManager.logManager.getLog(topicPartition1).get.dir.getParent).get
+      assertEquals(logIdCaptureForPartition.getAllValues.asScala, 
List(logDirIdHostingPartition0, logDirIdHostingPartition1))
+    } finally {
+      replicaManager.shutdown(checkpointHW = false)
+    }
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def 
testApplyDeltaShouldHandleReplicaAssignedToLostDirectory(enableRemoteStorage: 
Boolean): Unit = {
+    val localId = 1
+    val topicPartition0 = new TopicPartition("foo", 0)
+    val topicPartition1 = new TopicPartition("foo", 1)
+    val directoryEventHandler = mock(classOf[DirectoryEventHandler])
+
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), localId,
+      enableRemoteStorage = enableRemoteStorage, setupLogDirMetaProperties = 
true, directoryEventHandler = directoryEventHandler)
+
+    try {
+      // Make the local replica the leader
+      val leaderTopicsDelta = topicsCreateDelta(localId, true, directoryIds = 
List(DirectoryId.LOST, DirectoryId.LOST))
+      val leaderMetadataImage = imageFromTopics(leaderTopicsDelta.apply())
+      val topicId = leaderMetadataImage.topics().topicsByName.get("foo").id
+      val topicIdPartition0 = new TopicIdPartition(topicId, topicPartition0)
+      val topicIdPartition1 = new TopicIdPartition(topicId, topicPartition1)
+      replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage)
+
+      // Make the local replica the as follower
+      val followerTopicsDelta = topicsCreateDelta(localId, false, partition = 
1, directoryIds = List(DirectoryId.LOST, DirectoryId.LOST))
+      val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
+      replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
+
+      // Check the broker updated the controller with the correct assignment.
+      val topicIdPartitionCapture: 
ArgumentCaptor[org.apache.kafka.server.common.TopicIdPartition] =
+        
ArgumentCaptor.forClass(classOf[org.apache.kafka.server.common.TopicIdPartition])
+      val logIdCaptureForPartition: ArgumentCaptor[Uuid] = 
ArgumentCaptor.forClass(classOf[Uuid])
+      verify(replicaManager.directoryEventHandler, 
atLeastOnce()).handleAssignment(topicIdPartitionCapture.capture(), 
logIdCaptureForPartition.capture(), any())
+
+      assertEquals(topicIdPartitionCapture.getAllValues.asScala,
+        List(
+          new org.apache.kafka.server.common.TopicIdPartition(topicId, 
topicIdPartition0.partition()),
+          new org.apache.kafka.server.common.TopicIdPartition(topicId, 
topicIdPartition1.partition())
+        )
+      )
+      val logDirIdHostingPartition0 = 
replicaManager.logManager.directoryId(replicaManager.logManager.getLog(topicPartition0).get.dir.getParent).get
+      val logDirIdHostingPartition1 = 
replicaManager.logManager.directoryId(replicaManager.logManager.getLog(topicPartition1).get.dir.getParent).get
+      assertEquals(logIdCaptureForPartition.getAllValues.asScala, 
List(logDirIdHostingPartition0, logDirIdHostingPartition1))
+
+    } finally {
+      replicaManager.shutdown(checkpointHW = false)
+    }
+  }
+
   @ParameterizedTest
   @ValueSource(booleans = Array(true, false))
   def testDeltaFromLeaderToFollower(enableRemoteStorage: Boolean): Unit = {
@@ -5140,17 +5289,7 @@ class ReplicaManagerTest {
       // Make the local replica the follower.
       var followerTopicsDelta = new TopicsDelta(TopicsImage.EMPTY)
       followerTopicsDelta.replay(new 
TopicRecord().setName("foo").setTopicId(FOO_UUID))
-      followerTopicsDelta.replay(new PartitionRecord()
-        .setPartitionId(0)
-        .setTopicId(FOO_UUID)
-        .setReplicas(util.Arrays.asList(localId, localId + 1))
-        .setIsr(util.Arrays.asList(localId, localId + 1))
-        .setRemovingReplicas(Collections.emptyList())
-        .setAddingReplicas(Collections.emptyList())
-        .setLeader(localId + 1)
-        .setLeaderEpoch(0)
-        .setPartitionEpoch(0)
-      )
+      followerTopicsDelta.replay(partitionRecord(localId, localId + 1))
       var followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
       replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
 
@@ -5477,40 +5616,48 @@ class ReplicaManagerTest {
     }
   }
 
-  private def topicsCreateDelta(startId: Int, isStartIdLeader: Boolean): 
TopicsDelta = {
+  private def topicsCreateDelta(startId: Int, isStartIdLeader: Boolean, 
partition:Int = 0, directoryIds: List[Uuid] = List.empty): TopicsDelta = {
     val leader = if (isStartIdLeader) startId else startId + 1
     val delta = new TopicsDelta(TopicsImage.EMPTY)
     delta.replay(new TopicRecord().setName("foo").setTopicId(FOO_UUID))
-    delta.replay(
-      new PartitionRecord()
-        .setPartitionId(0)
-        .setTopicId(FOO_UUID)
-        .setReplicas(util.Arrays.asList(startId, startId + 1))
-        .setIsr(util.Arrays.asList(startId, startId + 1))
-        .setRemovingReplicas(Collections.emptyList())
-        .setAddingReplicas(Collections.emptyList())
-        .setLeader(leader)
-        .setLeaderEpoch(0)
-        .setPartitionEpoch(0)
-    )
+    val record = partitionRecord(startId, leader, partition)
+    if (!directoryIds.isEmpty) {
+      record.setDirectories(directoryIds.asJava)
+    }
+    delta.replay(record)
 
     delta
   }
 
+  private def partitionRecord(startId: Int, leader: Int, partition: Int = 0) = 
{
+    new PartitionRecord()
+      .setPartitionId(partition)
+      .setTopicId(FOO_UUID)
+      .setReplicas(util.Arrays.asList(startId, startId + 1))
+      .setIsr(util.Arrays.asList(startId, startId + 1))
+      .setRemovingReplicas(Collections.emptyList())
+      .setAddingReplicas(Collections.emptyList())
+      .setLeader(leader)
+      .setLeaderEpoch(0)
+      .setPartitionEpoch(0)
+  }
+
   private def topicsChangeDelta(topicsImage: TopicsImage, startId: Int, 
isStartIdLeader: Boolean): TopicsDelta = {
     val leader = if (isStartIdLeader) startId else startId + 1
     val delta = new TopicsDelta(topicsImage)
-    delta.replay(
-      new PartitionChangeRecord()
-        .setPartitionId(0)
-        .setTopicId(FOO_UUID)
-        .setReplicas(util.Arrays.asList(startId, startId + 1))
-        .setIsr(util.Arrays.asList(startId, startId + 1))
-        .setLeader(leader)
-    )
+    delta.replay(partitionChangeRecord(startId, leader))
     delta
   }
 
+  private def partitionChangeRecord(startId: Int, leader: Int) = {
+    new PartitionChangeRecord()
+      .setPartitionId(0)
+      .setTopicId(FOO_UUID)
+      .setReplicas(util.Arrays.asList(startId, startId + 1))
+      .setIsr(util.Arrays.asList(startId, startId + 1))
+      .setLeader(leader)
+  }
+
   private def topicsDeleteDelta(topicsImage: TopicsImage): TopicsDelta = {
     val delta = new TopicsDelta(topicsImage)
     delta.replay(new RemoveTopicRecord().setTopicId(FOO_UUID))
diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala 
b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
index bdf3d0afc9c..81292bd4801 100755
--- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
@@ -149,7 +149,7 @@ class ServerStartupTest extends QuorumTestHarness {
    }
    server = new KafkaServer(KafkaConfig.fromProps(props))
    server.startup()
-   assertEquals(!migrationEnabled, server.logManager.directoryIds.isEmpty)
+   assertEquals(!migrationEnabled, server.logManager.directoryIdsSet.isEmpty)
    server.shutdown()
  }
 
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 913298b719a..7887753a9d1 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -25,7 +25,7 @@ import java.nio.file.{Files, StandardOpenOption}
 import java.security.cert.X509Certificate
 import java.time.Duration
 import java.util
-import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, 
AtomicReference}
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
 import java.util.concurrent.{Callable, CompletableFuture, ExecutionException, 
Executors, TimeUnit}
 import java.util.{Arrays, Collections, Optional, Properties}
 import com.yammer.metrics.core.{Gauge, Histogram, Meter}
@@ -1471,7 +1471,7 @@ object TestUtils extends Logging {
 
     if (log.isDefined) {
       val spyLogManager = Mockito.spy(logManager)
-      Mockito.doReturn(log.get, Nil: 
_*).when(spyLogManager).getOrCreateLog(any(classOf[TopicPartition]), 
anyBoolean(), anyBoolean(), any(classOf[Option[Uuid]]))
+      Mockito.doReturn(log.get, Nil: 
_*).when(spyLogManager).getOrCreateLog(any(classOf[TopicPartition]), 
anyBoolean(), anyBoolean(), any(classOf[Option[Uuid]]), 
any(classOf[Option[Uuid]]))
       spyLogManager
     } else
       logManager
@@ -1529,7 +1529,6 @@ object TestUtils extends Logging {
     val expands: AtomicInteger = new AtomicInteger(0)
     val shrinks: AtomicInteger = new AtomicInteger(0)
     val failures: AtomicInteger = new AtomicInteger(0)
-    val directory: AtomicReference[String] = new AtomicReference[String]()
 
     override def markIsrExpand(): Unit = expands.incrementAndGet()
 
@@ -1537,13 +1536,11 @@ object TestUtils extends Logging {
 
     override def markFailed(): Unit = failures.incrementAndGet()
 
-    override def assignDir(dir: String): Unit = directory.set(dir)
 
     def reset(): Unit = {
       expands.set(0)
       shrinks.set(0)
       failures.set(0)
-      directory.set(null)
     }
   }
 
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
index 0cdca1e3c4c..7a91a611a1a 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
@@ -182,7 +182,7 @@ public class ReplicaFetcherThreadBenchmark {
                     0, () -> -1, Time.SYSTEM, alterPartitionListener, new 
DelayedOperationsMock(tp),
                     Mockito.mock(MetadataCache.class), logManager, 
isrChannelManager);
 
-            partition.makeFollower(partitionState, offsetCheckpoints, topicId);
+            partition.makeFollower(partitionState, offsetCheckpoints, topicId, 
Option.empty());
             pool.put(tp, partition);
             initialFetchStates.put(tp, new InitialFetchState(topicId, new 
BrokerEndPoint(3, "host", 3000), 0, 0));
             BaseRecords fetched = new BaseRecords() {
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
index cb8554083a7..b9f091a9a7c 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
@@ -125,7 +125,7 @@ public class PartitionMakeFollowerBenchmark {
             MetadataVersion.latest(), 0, () -> -1, Time.SYSTEM,
             alterPartitionListener, delayedOperations,
             Mockito.mock(MetadataCache.class), logManager, 
alterPartitionManager);
-        partition.createLogIfNotExists(true, false, offsetCheckpoints, 
topicId);
+        partition.createLogIfNotExists(true, false, offsetCheckpoints, 
topicId, Option.empty());
         executorService.submit((Runnable) () -> {
             SimpleRecord[] simpleRecords = new SimpleRecord[] {
                 new SimpleRecord(1L, "foo".getBytes(StandardCharsets.UTF_8), 
"1".getBytes(StandardCharsets.UTF_8)),
@@ -158,6 +158,6 @@ public class PartitionMakeFollowerBenchmark {
             .setPartitionEpoch(1)
             .setReplicas(replicas)
             .setIsNew(true);
-        return partition.makeFollower(partitionState, offsetCheckpoints, 
topicId);
+        return partition.makeFollower(partitionState, offsetCheckpoints, 
topicId, Option.empty());
     }
 }
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
index 0ebc3de2ff7..48befe4be4d 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
@@ -128,7 +128,7 @@ public class UpdateFollowerFetchStateBenchmark {
                 MetadataVersion.latest(), 0, () -> -1, Time.SYSTEM,
                 alterPartitionListener, delayedOperations,
                 Mockito.mock(MetadataCache.class), logManager, 
alterPartitionManager);
-        partition.makeLeader(partitionState, offsetCheckpoints, topicId);
+        partition.makeLeader(partitionState, offsetCheckpoints, topicId, 
Option.empty());
         replica1 = partition.getReplica(1).get();
         replica2 = partition.getReplica(2).get();
     }
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
index 67f6e43a826..0456ecda142 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
@@ -147,7 +147,7 @@ public class CheckpointBench {
         OffsetCheckpoints checkpoints = (logDir, topicPartition) -> 
Option.apply(0L);
         for (TopicPartition topicPartition : topicPartitions) {
             final Partition partition = 
this.replicaManager.createPartition(topicPartition);
-            partition.createLogIfNotExists(true, false, checkpoints, 
Option.apply(Uuid.randomUuid()));
+            partition.createLogIfNotExists(true, false, checkpoints, 
Option.apply(Uuid.randomUuid()), Option.empty());
         }
 
         replicaManager.checkpointHighWatermarks();
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
index 6f082581099..8972e44b42e 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
@@ -218,7 +218,7 @@ public class PartitionCreationBench {
                     .setReplicas(replicas)
                     .setIsNew(true);
 
-            partition.makeFollower(partitionState, checkpoints, topicId);
+            partition.makeFollower(partitionState, checkpoints, topicId, 
Option.empty());
         }
     }
 }
diff --git 
a/metadata/src/main/java/org/apache/kafka/image/LocalReplicaChanges.java 
b/metadata/src/main/java/org/apache/kafka/image/LocalReplicaChanges.java
index 0fe8e3ef138..b0f93a21de1 100644
--- a/metadata/src/main/java/org/apache/kafka/image/LocalReplicaChanges.java
+++ b/metadata/src/main/java/org/apache/kafka/image/LocalReplicaChanges.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.image;
 
+import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.metadata.PartitionRegistration;
@@ -30,17 +31,20 @@ public final class LocalReplicaChanges {
     private final Map<TopicPartition, PartitionInfo> followers;
     // The topic name -> topic id map in leaders and followers changes
     private final Map<String, Uuid> topicIds;
+    private final Map<TopicIdPartition, Uuid> directoryIds;
 
     LocalReplicaChanges(
         Set<TopicPartition> deletes,
         Map<TopicPartition, PartitionInfo> leaders,
         Map<TopicPartition, PartitionInfo> followers,
-        Map<String, Uuid> topicIds
+        Map<String, Uuid> topicIds,
+        Map<TopicIdPartition, Uuid> directoryIds
     ) {
         this.deletes = deletes;
         this.leaders = leaders;
         this.followers = followers;
         this.topicIds = topicIds;
+        this.directoryIds = directoryIds;
     }
 
     public Set<TopicPartition> deletes() {
@@ -69,6 +73,10 @@ public final class LocalReplicaChanges {
         );
     }
 
+    public Map<TopicIdPartition, Uuid> directoryIds() {
+        return directoryIds;
+    }
+
     public static final class PartitionInfo {
         private final Uuid topicId;
         private final PartitionRegistration partition;
diff --git a/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java 
b/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java
index c0543b7de4d..4983afdb989 100644
--- a/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.image;
 
+import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metadata.PartitionChangeRecord;
@@ -131,6 +132,7 @@ public final class TopicDelta {
         Map<TopicPartition, LocalReplicaChanges.PartitionInfo> leaders = new 
HashMap<>();
         Map<TopicPartition, LocalReplicaChanges.PartitionInfo> followers = new 
HashMap<>();
         Map<String, Uuid> topicIds = new HashMap<>();
+        Map<TopicIdPartition, Uuid> directoryIds = new HashMap<>();
 
         for (Entry<Integer, PartitionRegistration> entry : 
partitionChanges.entrySet()) {
             if (!Replicas.contains(entry.getValue().replicas, brokerId)) {
@@ -160,9 +162,25 @@ public final class TopicDelta {
                     topicIds.putIfAbsent(name(), id());
                 }
             }
+
+            try {
+                PartitionRegistration prevPartition = 
image.partitions().get(entry.getKey());
+                if (
+                        prevPartition == null ||
+                        prevPartition.directories == null ||
+                        prevPartition.directory(brokerId) != 
entry.getValue().directory(brokerId)
+                ) {
+                    directoryIds.put(
+                        new TopicIdPartition(id(), new TopicPartition(name(), 
entry.getKey())),
+                        entry.getValue().directory(brokerId)
+                    );
+                }
+            } catch (IllegalArgumentException e) {
+                // Do nothing if broker isn't part of the replica set.
+            }
         }
 
-        return new LocalReplicaChanges(deletes, leaders, followers, topicIds);
+        return new LocalReplicaChanges(deletes, leaders, followers, topicIds, 
directoryIds);
     }
 
     @Override
diff --git a/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java 
b/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
index ce65e16a127..2faa1f930d0 100644
--- a/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.image;
 
+import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.metadata.PartitionChangeRecord;
@@ -191,6 +192,7 @@ public final class TopicsDelta {
         Map<TopicPartition, LocalReplicaChanges.PartitionInfo> leaders = new 
HashMap<>();
         Map<TopicPartition, LocalReplicaChanges.PartitionInfo> followers = new 
HashMap<>();
         Map<String, Uuid> topicIds = new HashMap<>();
+        Map<TopicIdPartition, Uuid> directoryIds = new HashMap<>();
 
         for (TopicDelta delta : changedTopics.values()) {
             LocalReplicaChanges changes = delta.localChanges(brokerId);
@@ -199,6 +201,7 @@ public final class TopicsDelta {
             leaders.putAll(changes.leaders());
             followers.putAll(changes.followers());
             topicIds.putAll(changes.topicIds());
+            directoryIds.putAll(changes.directoryIds());
         }
 
         // Add all of the removed topic partitions to the set of locally 
removed partitions
@@ -211,7 +214,7 @@ public final class TopicsDelta {
             });
         });
 
-        return new LocalReplicaChanges(deletes, leaders, followers, topicIds);
+        return new LocalReplicaChanges(deletes, leaders, followers, topicIds, 
directoryIds);
     }
 
     @Override

Reply via email to