This is an automated email from the ASF dual-hosted git repository. rndgstn pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 07490b929bd KAFKA-15365: Broker-side replica management changes (#14881) 07490b929bd is described below commit 07490b929bdf923195632bed7b494aa7d22d9f31 Author: Omnia Ibrahim <o.g.h.ibra...@gmail.com> AuthorDate: Mon Dec 11 14:34:22 2023 +0000 KAFKA-15365: Broker-side replica management changes (#14881) Reviewers: Igor Soarez <soa...@apple.com>, Ron Dagostino <rndg...@gmail.com>, Proven Provenzano <pprovenz...@confluent.io> --- core/src/main/scala/kafka/cluster/Partition.scala | 40 ++-- core/src/main/scala/kafka/log/LogManager.scala | 24 ++- .../src/main/scala/kafka/server/BrokerServer.scala | 2 +- core/src/main/scala/kafka/server/KafkaServer.scala | 2 +- .../main/scala/kafka/server/ReplicaManager.scala | 58 +++--- .../unit/kafka/cluster/PartitionLockTest.scala | 4 +- .../scala/unit/kafka/cluster/PartitionTest.scala | 10 +- .../test/scala/unit/kafka/log/LogManagerTest.scala | 76 +++++-- .../unit/kafka/server/ReplicaManagerTest.scala | 221 +++++++++++++++++---- .../unit/kafka/server/ServerStartupTest.scala | 2 +- .../test/scala/unit/kafka/utils/TestUtils.scala | 7 +- .../jmh/fetcher/ReplicaFetcherThreadBenchmark.java | 2 +- .../partition/PartitionMakeFollowerBenchmark.java | 4 +- .../UpdateFollowerFetchStateBenchmark.java | 2 +- .../apache/kafka/jmh/server/CheckpointBench.java | 2 +- .../kafka/jmh/server/PartitionCreationBench.java | 2 +- .../apache/kafka/image/LocalReplicaChanges.java | 10 +- .../java/org/apache/kafka/image/TopicDelta.java | 20 +- .../java/org/apache/kafka/image/TopicsDelta.java | 5 +- 19 files changed, 361 insertions(+), 132 deletions(-) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index b3e7fecef8a..094e3cda419 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -30,7 +30,7 @@ import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache} import kafka.utils.CoreUtils.{inReadLock, inWriteLock} import kafka.utils._ import kafka.zookeeper.ZooKeeperClientException -import org.apache.kafka.common.TopicIdPartition +import org.apache.kafka.common.{DirectoryId, IsolationLevel, TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.common.errors._ import org.apache.kafka.common.message.AlterPartitionRequestData.BrokerState import org.apache.kafka.common.message.{DescribeProducersResponseData, FetchResponseData} @@ -42,7 +42,6 @@ import org.apache.kafka.common.record.{MemoryRecords, RecordBatch} import org.apache.kafka.common.requests._ import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET} import org.apache.kafka.common.utils.Time -import org.apache.kafka.common.{IsolationLevel, TopicPartition, Uuid} import org.apache.kafka.metadata.LeaderRecoveryState import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchIsolation, FetchParams, LeaderHwChange, LogAppendInfo, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogReadInfo, LogStartOffsetIncrementReason, VerificationGuard} @@ -85,7 +84,6 @@ trait AlterPartitionListener { def markIsrExpand(): Unit def markIsrShrink(): Unit def markFailed(): Unit - def assignDir(dir: String): Unit } class DelayedOperations(topicPartition: TopicPartition, @@ -120,10 +118,6 @@ object Partition { } override def markFailed(): Unit = replicaManager.failedIsrUpdatesRate.mark() - - override def assignDir(dir: String): Unit = { - replicaManager.maybeNotifyPartitionAssignedToDirectory(topicPartition, dir) - } } val delayedOperations = new DelayedOperations( @@ -447,7 +441,8 @@ class Partition(val topicPartition: TopicPartition, } } - def createLogIfNotExists(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints, topicId: Option[Uuid]): Unit = { + def createLogIfNotExists(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints, topicId: Option[Uuid], + targetLogDirectoryId: Option[Uuid] = None): Unit = { def maybeCreate(logOpt: Option[UnifiedLog]): UnifiedLog = { logOpt match { case Some(log) => @@ -456,7 +451,7 @@ class Partition(val topicPartition: TopicPartition, topicId.foreach(log.assignTopicId) log case None => - createLog(isNew, isFutureReplica, offsetCheckpoints, topicId) + createLog(isNew, isFutureReplica, offsetCheckpoints, topicId, targetLogDirectoryId) } } @@ -468,7 +463,8 @@ class Partition(val topicPartition: TopicPartition, } // Visible for testing - private[cluster] def createLog(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints, topicId: Option[Uuid]): UnifiedLog = { + private[cluster] def createLog(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints, + topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): UnifiedLog = { def updateHighWatermark(log: UnifiedLog): Unit = { val checkpointHighWatermark = offsetCheckpoints.fetch(log.parentDir, topicPartition).getOrElse { info(s"No checkpointed highwatermark is found for partition $topicPartition") @@ -481,11 +477,10 @@ class Partition(val topicPartition: TopicPartition, logManager.initializingLog(topicPartition) var maybeLog: Option[UnifiedLog] = None try { - val log = logManager.getOrCreateLog(topicPartition, isNew, isFutureReplica, topicId) + val log = logManager.getOrCreateLog(topicPartition, isNew, isFutureReplica, topicId, targetLogDirectoryId) if (!isFutureReplica) log.setLogOffsetsListener(logOffsetsListener) maybeLog = Some(log) updateHighWatermark(log) - alterPartitionListener.assignDir(log.parentDir) log } finally { logManager.finishedInitializingLog(topicPartition, maybeLog) @@ -658,7 +653,6 @@ class Partition(val topicPartition: TopicPartition, // need to hold the lock to prevent appendMessagesToLeader() from hitting I/O exceptions due to log being deleted inWriteLock(leaderIsrUpdateLock) { clear() - listeners.forEach { listener => listener.onDeleted(topicPartition) } @@ -703,7 +697,8 @@ class Partition(val topicPartition: TopicPartition, */ def makeLeader(partitionState: LeaderAndIsrPartitionState, highWatermarkCheckpoints: OffsetCheckpoints, - topicId: Option[Uuid]): Boolean = { + topicId: Option[Uuid], + targetDirectoryId: Option[Uuid] = None): Boolean = { val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) { // Partition state changes are expected to have an partition epoch larger or equal // to the current partition epoch. The latter is allowed because the partition epoch @@ -745,7 +740,7 @@ class Partition(val topicPartition: TopicPartition, ) try { - createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints, topicId) + createLogInAssignedDirectoryId(partitionState, highWatermarkCheckpoints, topicId, targetDirectoryId) } catch { case e: ZooKeeperClientException => stateChangeLogger.error(s"A ZooKeeper client exception has occurred and makeLeader will be skipping the " + @@ -819,7 +814,8 @@ class Partition(val topicPartition: TopicPartition, */ def makeFollower(partitionState: LeaderAndIsrPartitionState, highWatermarkCheckpoints: OffsetCheckpoints, - topicId: Option[Uuid]): Boolean = { + topicId: Option[Uuid], + targetLogDirectoryId: Option[Uuid] = None): Boolean = { inWriteLock(leaderIsrUpdateLock) { if (partitionState.partitionEpoch < partitionEpoch) { stateChangeLogger.info(s"Skipped the become-follower state change for $topicPartition with topic id $topicId " + @@ -849,7 +845,7 @@ class Partition(val topicPartition: TopicPartition, ) try { - createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints, topicId) + createLogInAssignedDirectoryId(partitionState, highWatermarkCheckpoints, topicId, targetLogDirectoryId) } catch { case e: ZooKeeperClientException => stateChangeLogger.error(s"A ZooKeeper client exception has occurred. makeFollower will be skipping the " + @@ -875,6 +871,16 @@ class Partition(val topicPartition: TopicPartition, } } + private def createLogInAssignedDirectoryId(partitionState: LeaderAndIsrPartitionState, highWatermarkCheckpoints: OffsetCheckpoints, topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): Unit = { + targetLogDirectoryId match { + case Some(directoryId) => + createLogIfNotExists(directoryId == DirectoryId.UNASSIGNED, isFutureReplica = false, highWatermarkCheckpoints, topicId, targetLogDirectoryId) + + case None => + createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints, topicId) + } + } + /** * Update the follower's state in the leader based on the last fetch request. See * [[Replica.updateFetchStateOrThrow()]] for details. diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 4f316e1f960..f84177d9717 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -25,7 +25,7 @@ import kafka.server.checkpoints.OffsetCheckpointFile import kafka.server.metadata.ConfigRepository import kafka.server._ import kafka.utils._ -import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid} +import org.apache.kafka.common.{DirectoryId, KafkaException, TopicPartition, Uuid} import org.apache.kafka.common.utils.{KafkaThread, Time, Utils} import org.apache.kafka.common.errors.{InconsistentTopicIdException, KafkaStorageException, LogDirNotFoundException} @@ -123,7 +123,9 @@ class LogManager(logDirs: Seq[File], } private val dirLocks = lockLogDirs(liveLogDirs) - val directoryIds: Map[String, Uuid] = loadDirectoryIds(liveLogDirs) + private val directoryIds: mutable.Map[String, Uuid] = loadDirectoryIds(liveLogDirs) + def directoryIdsSet: Predef.Set[Uuid] = directoryIds.values.toSet + @volatile private var recoveryPointCheckpoints = liveLogDirs.map(dir => (dir, new OffsetCheckpointFile(new File(dir, RecoveryPointCheckpointFile), logDirFailureChannel))).toMap @volatile private var logStartOffsetCheckpoints = liveLogDirs.map(dir => @@ -216,6 +218,7 @@ class LogManager(logDirs: Seq[File], warn(s"Stopping serving logs in dir $dir") logCreationOrDeletionLock synchronized { _liveLogDirs.remove(new File(dir)) + directoryIds.remove(dir) if (_liveLogDirs.isEmpty) { fatal(s"Shutdown broker because all log dirs in ${logDirs.mkString(", ")} have failed") Exit.halt(1) @@ -278,7 +281,7 @@ class LogManager(logDirs: Seq[File], * If meta.properties does not include a directory ID, one is generated and persisted back to meta.properties. * Directories without a meta.properties don't get a directory ID assigned. */ - private def loadDirectoryIds(logDirs: Seq[File]): Map[String, Uuid] = { + private def loadDirectoryIds(logDirs: Seq[File]): mutable.Map[String, Uuid] = { val result = mutable.HashMap[String, Uuid]() logDirs.foreach(logDir => { try { @@ -991,10 +994,14 @@ class LogManager(logDirs: Seq[File], * @param isNew Whether the replica should have existed on the broker or not * @param isFuture True if the future log of the specified partition should be returned or created * @param topicId The topic ID of the partition's topic + * @param targetLogDirectoryId The directory Id that should host the the partition's topic. + * The next selected directory will be picked up if it None or equal {@link DirectoryId.UNASSIGNED}. + * The method assumes provided Id belong to online directory. * @throws KafkaStorageException if isNew=false, log is not found in the cache and there is offline log directory on the broker * @throws InconsistentTopicIdException if the topic ID in the log does not match the topic ID provided */ - def getOrCreateLog(topicPartition: TopicPartition, isNew: Boolean = false, isFuture: Boolean = false, topicId: Option[Uuid]): UnifiedLog = { + def getOrCreateLog(topicPartition: TopicPartition, isNew: Boolean = false, isFuture: Boolean = false, + topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid] = Option.empty): UnifiedLog = { logCreationOrDeletionLock synchronized { val log = getLog(topicPartition, isFuture).getOrElse { // create the log if it has not already been created in another thread @@ -1002,7 +1009,14 @@ class LogManager(logDirs: Seq[File], throw new KafkaStorageException(s"Can not create log for $topicPartition because log directories ${offlineLogDirs.mkString(",")} are offline") val logDirs: List[File] = { - val preferredLogDir = preferredLogDirs.get(topicPartition) + val preferredLogDir = targetLogDirectoryId.filterNot(Seq(DirectoryId.UNASSIGNED,DirectoryId.LOST).contains) match { + case Some(targetId) if !preferredLogDirs.containsKey(topicPartition) => + // If partition is configured with both targetLogDirectoryId and preferredLogDirs, then + // preferredLogDirs will be respected, otherwise targetLogDirectoryId will be respected + directoryIds.find(_._2 == targetId).map(_._1).getOrElse(null) + case _ => + preferredLogDirs.get(topicPartition) + } if (isFuture) { if (preferredLogDir == null) diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index fccceadbcec..6701481a7c6 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -210,7 +210,7 @@ class BrokerServer( time, s"broker-${config.nodeId}-", isZkBroker = false, - logDirs = logManager.directoryIds.values.toSet) + logDirs = logManager.directoryIdsSet) // Enable delegation token cache for all SCRAM mechanisms to simplify dynamic update. // This keeps the cache up-to-date if new SCRAM mechanisms are enabled dynamically. diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 1699842d10f..1d3275632f0 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -417,7 +417,7 @@ class KafkaServer( time, s"zk-broker-${config.nodeId}-", isZkBroker = true, - logManager.directoryIds.values.toSet) + logManager.directoryIdsSet) // If the ZK broker is in migration mode, start up a RaftManager to learn about the new KRaft controller val controllerQuorumVotersFuture = CompletableFuture.completedFuture( diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index b0e31fbb415..60f69caad4e 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -27,7 +27,7 @@ import kafka.server.QuotaFactory.QuotaManagers import kafka.server.ReplicaManager.{AtMinIsrPartitionCountMetricName, FailedIsrUpdatesPerSecMetricName, IsrExpandsPerSecMetricName, IsrShrinksPerSecMetricName, LeaderCountMetricName, OfflineReplicaCountMetricName, PartitionCountMetricName, PartitionsWithLateTransactionsCountMetricName, ProducerIdCountMetricName, ReassigningPartitionsMetricName, UnderMinIsrPartitionCountMetricName, UnderReplicatedPartitionsMetricName} import kafka.server.ReplicaManager.createLogReadResult import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpointFile, OffsetCheckpoints} -import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache} +import kafka.server.metadata.ZkMetadataCache import kafka.utils.Implicits._ import kafka.utils._ import kafka.zk.KafkaZkClient @@ -272,7 +272,7 @@ class ReplicaManager(val config: KafkaConfig, threadNamePrefix: Option[String] = None, val brokerEpochSupplier: () => Long = () => -1, addPartitionsToTxnManager: Option[AddPartitionsToTxnManager] = None, - directoryEventHandler: DirectoryEventHandler = DirectoryEventHandler.NOOP + val directoryEventHandler: DirectoryEventHandler = DirectoryEventHandler.NOOP ) extends Logging { private val metricsGroup = new KafkaMetricsGroup(this.getClass) @@ -2352,32 +2352,6 @@ class ReplicaManager(val config: KafkaConfig, warn(s"Stopped serving replicas in dir $dir") } - /** - * Called when a topic partition is placed in a log directory. - * If a directory event listener is configured, - * and if the selected log directory has an assigned Uuid, - * then the listener is notified of the assignment. - */ - def maybeNotifyPartitionAssignedToDirectory(tp: TopicPartition, dir: String): Unit = { - if (metadataCache.isInstanceOf[KRaftMetadataCache]) { - logManager.directoryId(dir) match { - case None => throw new IllegalStateException(s"Assignment into unidentified directory: ${dir}") - case Some(dirId) => - getPartition(tp) match { - case HostedPartition.Offline | HostedPartition.None => - throw new IllegalStateException("Assignment for a partition that is not online") - case HostedPartition.Online(partition) => partition.topicId match { - case None => - throw new IllegalStateException(s"Assignment for topic without ID: ${tp.topic()}") - case Some(topicId) => - val topicIdPartition = new common.TopicIdPartition(topicId, tp.partition()) - directoryEventHandler.handleAssignment(topicIdPartition, dirId, () => ()) - } - } - } - } - } - def removeMetrics(): Unit = { ReplicaManager.MetricNames.foreach(metricsGroup.removeMetric) } @@ -2598,10 +2572,10 @@ class ReplicaManager(val config: KafkaConfig, val leaderChangedPartitions = new mutable.HashSet[Partition] val followerChangedPartitions = new mutable.HashSet[Partition] if (!localChanges.leaders.isEmpty) { - applyLocalLeadersDelta(leaderChangedPartitions, delta, lazyOffsetCheckpoints, localChanges.leaders.asScala) + applyLocalLeadersDelta(leaderChangedPartitions, newImage, delta, lazyOffsetCheckpoints, localChanges.leaders.asScala, localChanges.directoryIds.asScala) } if (!localChanges.followers.isEmpty) { - applyLocalFollowersDelta(followerChangedPartitions, newImage, delta, lazyOffsetCheckpoints, localChanges.followers.asScala) + applyLocalFollowersDelta(followerChangedPartitions, newImage, delta, lazyOffsetCheckpoints, localChanges.followers.asScala, localChanges.directoryIds.asScala) } maybeAddLogDirFetchers(leaderChangedPartitions ++ followerChangedPartitions, lazyOffsetCheckpoints, @@ -2612,14 +2586,18 @@ class ReplicaManager(val config: KafkaConfig, remoteLogManager.foreach(rlm => rlm.onLeadershipChange(leaderChangedPartitions.asJava, followerChangedPartitions.asJava, localChanges.topicIds())) } + + localChanges.directoryIds.forEach(maybeUpdateTopicAssignment) } } private def applyLocalLeadersDelta( changedPartitions: mutable.Set[Partition], + newImage: MetadataImage, delta: TopicsDelta, offsetCheckpoints: OffsetCheckpoints, - localLeaders: mutable.Map[TopicPartition, LocalReplicaChanges.PartitionInfo] + localLeaders: mutable.Map[TopicPartition, LocalReplicaChanges.PartitionInfo], + directoryIds: mutable.Map[TopicIdPartition, Uuid] ): Unit = { stateChangeLogger.info(s"Transitioning ${localLeaders.size} partition(s) to " + "local leaders.") @@ -2628,7 +2606,9 @@ class ReplicaManager(val config: KafkaConfig, getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition, isNew) => try { val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew) - partition.makeLeader(state, offsetCheckpoints, Some(info.topicId)) + val partitionAssignedDirectoryId = directoryIds.find(_._1.topicPartition() == tp).map(_._2) + partition.makeLeader(state, offsetCheckpoints, Some(info.topicId), partitionAssignedDirectoryId) + changedPartitions.add(partition) } catch { case e: KafkaStorageException => @@ -2649,7 +2629,8 @@ class ReplicaManager(val config: KafkaConfig, newImage: MetadataImage, delta: TopicsDelta, offsetCheckpoints: OffsetCheckpoints, - localFollowers: mutable.Map[TopicPartition, LocalReplicaChanges.PartitionInfo] + localFollowers: mutable.Map[TopicPartition, LocalReplicaChanges.PartitionInfo], + directoryIds: mutable.Map[TopicIdPartition, Uuid] ): Unit = { stateChangeLogger.info(s"Transitioning ${localFollowers.size} partition(s) to " + "local followers.") @@ -2667,7 +2648,8 @@ class ReplicaManager(val config: KafkaConfig, // is unavailable. This is required to ensure that we include the partition's // high watermark in the checkpoint file (see KAFKA-1647). val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew) - val isNewLeaderEpoch = partition.makeFollower(state, offsetCheckpoints, Some(info.topicId)) + val partitionAssignedDirectoryId = directoryIds.find(_._1.topicPartition() == tp).map(_._2) + val isNewLeaderEpoch = partition.makeFollower(state, offsetCheckpoints, Some(info.topicId), partitionAssignedDirectoryId) if (isInControlledShutdown && (info.partition.leader == NO_LEADER || !info.partition.isr.contains(config.brokerId))) { @@ -2742,6 +2724,14 @@ class ReplicaManager(val config: KafkaConfig, } } + private def maybeUpdateTopicAssignment(partition: TopicIdPartition, partitionDirectoryId: Uuid): Unit = { + for { + topicPartitionActualLog <- logManager.getLog(partition.topicPartition(), false) + topicPartitionActualDirectoryId <- logManager.directoryId(topicPartitionActualLog.dir.getParent) + if partitionDirectoryId != topicPartitionActualDirectoryId + } directoryEventHandler.handleAssignment(new common.TopicIdPartition(partition.topicId, partition.partition()), topicPartitionActualDirectoryId, () => ()) + } + def deleteStrayReplicas(topicPartitions: Iterable[TopicPartition]): Unit = { stopPartitions(topicPartitions.map(tp => tp -> true).toMap).forKeyValue { (topicPartition, exception) => exception match { diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala index 445371c108b..054d1690d80 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala @@ -293,8 +293,8 @@ class PartitionLockTest extends Logging { } } - override def createLog(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints, topicId: Option[Uuid]): UnifiedLog = { - val log = super.createLog(isNew, isFutureReplica, offsetCheckpoints, None) + override def createLog(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints, topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): UnifiedLog = { + val log = super.createLog(isNew, isFutureReplica, offsetCheckpoints, None, None) val logDirFailureChannel = new LogDirFailureChannel(1) val segments = new LogSegments(log.topicPartition) val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(log.dir, log.topicPartition, logDirFailureChannel, log.config.recordVersion, "") diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index b908fcc5e59..9180147618f 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -427,8 +427,8 @@ class PartitionTest extends AbstractPartitionTest { logManager, alterPartitionManager) { - override def createLog(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints, topicId: Option[Uuid]): UnifiedLog = { - val log = super.createLog(isNew, isFutureReplica, offsetCheckpoints, None) + override def createLog(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints, topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): UnifiedLog = { + val log = super.createLog(isNew, isFutureReplica, offsetCheckpoints, None, None) val logDirFailureChannel = new LogDirFailureChannel(1) val segments = new LogSegments(log.topicPartition) val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(log.dir, log.topicPartition, logDirFailureChannel, log.config.recordVersion, "") @@ -2994,7 +2994,7 @@ class PartitionTest extends AbstractPartitionTest { spyLogManager, alterPartitionManager) - partition.createLog(isNew = true, isFutureReplica = false, offsetCheckpoints, topicId = None) + partition.createLog(isNew = true, isFutureReplica = false, offsetCheckpoints, topicId = None, targetLogDirectoryId = None) // Validate that initializingLog and finishedInitializingLog was called verify(spyLogManager).initializingLog(ArgumentMatchers.eq(topicPartition)) @@ -3033,7 +3033,7 @@ class PartitionTest extends AbstractPartitionTest { spyLogManager, alterPartitionManager) - partition.createLog(isNew = true, isFutureReplica = false, offsetCheckpoints, topicId = None) + partition.createLog(isNew = true, isFutureReplica = false, offsetCheckpoints, topicId = None, targetLogDirectoryId = None) // Validate that initializingLog and finishedInitializingLog was called verify(spyLogManager).initializingLog(ArgumentMatchers.eq(topicPartition)) @@ -3075,7 +3075,7 @@ class PartitionTest extends AbstractPartitionTest { spyLogManager, alterPartitionManager) - partition.createLog(isNew = true, isFutureReplica = false, offsetCheckpoints, topicId = None) + partition.createLog(isNew = true, isFutureReplica = false, offsetCheckpoints, topicId = None, targetLogDirectoryId = None) // Validate that initializingLog and finishedInitializingLog was called verify(spyLogManager).initializingLog(ArgumentMatchers.eq(topicPartition)) diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index e14e2eb8e07..b2a16b5bfa6 100755 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -26,7 +26,7 @@ import org.apache.directory.api.util.FileUtils import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.errors.OffsetOutOfRangeException import org.apache.kafka.common.utils.Utils -import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid} +import org.apache.kafka.common.{DirectoryId, KafkaException, TopicPartition, Uuid} import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} @@ -93,6 +93,54 @@ class LogManagerTest { log.appendAsLeader(TestUtils.singletonRecords("test".getBytes()), leaderEpoch = 0) } + /** + * Test that getOrCreateLog on a non-existent log creates a new log in given logDirectory using directory id and that we can append to the new log. + */ + @Test + def testCreateLogOnTargetedLogDirectory(): Unit = { + val targetedLogDirectoryId = DirectoryId.random() + + val dirs: Seq[File] = Seq.fill(5)(TestUtils.tempDir()) + writeMetaProperties(dirs(0)) + writeMetaProperties(dirs(1), Optional.of(targetedLogDirectoryId)) + writeMetaProperties(dirs(3), Optional.of(DirectoryId.random())) + writeMetaProperties(dirs(4)) + + logManager = createLogManager(dirs) + + val log = logManager.getOrCreateLog(new TopicPartition(name, 0), topicId = None, targetLogDirectoryId = Some(targetedLogDirectoryId)) + assertEquals(5, logManager.liveLogDirs.size) + + val logFile = new File(dirs(1), name + "-0") + assertTrue(logFile.exists) + assertEquals(dirs(1).getAbsolutePath, logFile.getParent) + log.appendAsLeader(TestUtils.singletonRecords("test".getBytes()), leaderEpoch = 0) + } + + /** + * Test that getOrCreateLog on a non-existent log creates a new log in the next selected logDirectory if the given directory id is DirectoryId.UNASSIGNED. + */ + @Test + def testCreateLogWithTargetedLogDirectorySetAsUnassigned(): Unit = { + val log = logManager.getOrCreateLog(new TopicPartition(name, 0), topicId = None, targetLogDirectoryId = Some(DirectoryId.UNASSIGNED)) + assertEquals(1, logManager.liveLogDirs.size) + val logFile = new File(logDir, name + "-0") + assertTrue(logFile.exists) + assertFalse(logManager.directoryId(logFile.getParent).equals(DirectoryId.UNASSIGNED)) + log.appendAsLeader(TestUtils.singletonRecords("test".getBytes()), leaderEpoch = 0) + } + + @Test + def testCreateLogWithTargetedLogDirectorySetAsUnknownWithoutAnyOfflineDirectories(): Unit = { + + val log = logManager.getOrCreateLog(new TopicPartition(name, 0), topicId = None, targetLogDirectoryId = Some(DirectoryId.LOST)) + assertEquals(1, logManager.liveLogDirs.size) + val logFile = new File(logDir, name + "-0") + assertTrue(logFile.exists) + assertFalse(logManager.directoryId(logFile.getParent).equals(DirectoryId.random())) + log.appendAsLeader(TestUtils.singletonRecords("test".getBytes()), leaderEpoch = 0) + } + /** * Tests that all internal futures are completed before LogManager.shutdown() returns to the * caller during error situations. @@ -1044,19 +1092,6 @@ class LogManagerTest { @Test def testLoadDirectoryIds(): Unit = { - def writeMetaProperties( - dir: File, - directoryId: Optional[Uuid] = Optional.empty() - ): Unit = { - val metaProps = new MetaProperties.Builder(). - setVersion(MetaPropertiesVersion.V0). - setClusterId("IVT1Seu3QjacxS7oBTKhDQ"). - setNodeId(1). - setDirectoryId(directoryId). - build() - PropertiesUtils.writePropertiesFile(metaProps.toProperties, - new File(dir, MetaPropertiesEnsemble.META_PROPERTIES_NAME).getAbsolutePath, false) - } val dirs: Seq[File] = Seq.fill(5)(TestUtils.tempDir()) writeMetaProperties(dirs(0)) writeMetaProperties(dirs(1), Optional.of(Uuid.fromString("ZwkGXjB0TvSF6mjVh6gO7Q"))) @@ -1072,6 +1107,17 @@ class LogManagerTest { assertEquals(None, logManager.directoryId(dirs(2).getAbsolutePath)) assertEquals(Some(Uuid.fromString("kQfNPJ2FTHq_6Qlyyv6Jqg")), logManager.directoryId(dirs(3).getAbsolutePath)) assertTrue(logManager.directoryId(dirs(3).getAbsolutePath).isDefined) - assertEquals(2, logManager.directoryIds.size) + assertEquals(2, logManager.directoryIdsSet.size) + } + + def writeMetaProperties(dir: File, directoryId: Optional[Uuid] = Optional.empty()): Unit = { + val metaProps = new MetaProperties.Builder(). + setVersion(MetaPropertiesVersion.V0). + setClusterId("IVT1Seu3QjacxS7oBTKhDQ"). + setNodeId(1). + setDirectoryId(directoryId). + build() + PropertiesUtils.writePropertiesFile(metaProps.toProperties, + new File(dir, MetaPropertiesEnsemble.META_PROPERTIES_NAME).getAbsolutePath, false) } } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index a4ececf2197..e40fd90f318 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -52,11 +52,11 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests._ import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.utils.{LogContext, Time, Utils} -import org.apache.kafka.common.{IsolationLevel, Node, TopicIdPartition, TopicPartition, Uuid} +import org.apache.kafka.common.{DirectoryId, IsolationLevel, Node, TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.image._ import org.apache.kafka.metadata.LeaderConstants.NO_LEADER import org.apache.kafka.metadata.LeaderRecoveryState -import org.apache.kafka.server.common.OffsetAndEpoch +import org.apache.kafka.server.common.{DirectoryEventHandler, OffsetAndEpoch} import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0 import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics} import org.apache.kafka.server.util.{MockScheduler, MockTime} @@ -68,6 +68,7 @@ import org.junit.jupiter.params.provider.{EnumSource, ValueSource} import com.yammer.metrics.core.Gauge import kafka.log.remote.RemoteLogManager import org.apache.kafka.common.config.{AbstractConfig, TopicConfig} +import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils} import org.apache.kafka.raft.RaftConfig import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManagerConfig} import org.apache.kafka.server.util.timer.MockTimer @@ -76,7 +77,7 @@ import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer import org.mockito.{ArgumentCaptor, ArgumentMatchers} import org.mockito.ArgumentMatchers.{any, anyInt, anyLong, anyMap, anySet, anyString} -import org.mockito.Mockito.{doAnswer, doReturn, mock, mockConstruction, never, reset, spy, times, verify, verifyNoMoreInteractions, when} +import org.mockito.Mockito.{atLeastOnce, doAnswer, doReturn, mock, mockConstruction, never, reset, spy, times, verify, verifyNoInteractions, verifyNoMoreInteractions, when} import scala.collection.{Map, Seq, mutable} import scala.compat.java8.OptionConverters.RichOptionForJava8 @@ -2779,7 +2780,7 @@ class ReplicaManagerTest { val topicPartitionObj = new TopicPartition(topic, topicPartition) val mockLogMgr: LogManager = mock(classOf[LogManager]) when(mockLogMgr.liveLogDirs).thenReturn(config.logDirs.map(new File(_).getAbsoluteFile)) - when(mockLogMgr.getOrCreateLog(ArgumentMatchers.eq(topicPartitionObj), ArgumentMatchers.eq(false), ArgumentMatchers.eq(false), any())).thenReturn(mockLog) + when(mockLogMgr.getOrCreateLog(ArgumentMatchers.eq(topicPartitionObj), ArgumentMatchers.eq(false), ArgumentMatchers.eq(false), any(), any())).thenReturn(mockLog) when(mockLogMgr.getLog(topicPartitionObj, isFuture = false)).thenReturn(Some(mockLog)) when(mockLogMgr.getLog(topicPartitionObj, isFuture = true)).thenReturn(None) val allLogs = new Pool[TopicPartition, UnifiedLog]() @@ -3119,7 +3120,9 @@ class ReplicaManagerTest { enableRemoteStorage: Boolean = false, shouldMockLog: Boolean = false, remoteLogManager: Option[RemoteLogManager] = None, - defaultTopicRemoteLogStorageEnable: Boolean = true + defaultTopicRemoteLogStorageEnable: Boolean = true, + setupLogDirMetaProperties: Boolean = false, + directoryEventHandler: DirectoryEventHandler = DirectoryEventHandler.NOOP ): ReplicaManager = { val props = TestUtils.createBrokerConfig(brokerId, TestUtils.MockZkConnect) val path1 = TestUtils.tempRelativeDir("data").getAbsolutePath @@ -3132,6 +3135,19 @@ class ReplicaManagerTest { logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") } val mockLog = setupMockLog(path1) + if (setupLogDirMetaProperties) { + // add meta.properties file in each dir + config.logDirs.foreach(dir => { + val metaProps = new MetaProperties.Builder(). + setVersion(MetaPropertiesVersion.V0). + setClusterId("clusterId"). + setNodeId(brokerId). + setDirectoryId(DirectoryId.random()). + build() + PropertiesUtils.writePropertiesFile(metaProps.toProperties, + new File(new File(dir), MetaPropertiesEnsemble.META_PROPERTIES_NAME).getAbsolutePath, false) + }) + } val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)), new LogConfig(logProps), log = if (shouldMockLog) Some(mockLog) else None, remoteStorageSystemEnable = enableRemoteStorage) val aliveBrokers = aliveBrokerIds.map(brokerId => new Node(brokerId, s"host$brokerId", brokerId)) @@ -3175,6 +3191,7 @@ class ReplicaManagerTest { delayedRemoteFetchPurgatoryParam = Some(mockDelayedRemoteFetchPurgatory), threadNamePrefix = Option(this.getClass.getName), addPartitionsToTxnManager = Some(addPartitionsToTxnManager), + directoryEventHandler = directoryEventHandler, remoteLogManager = if (enableRemoteStorage) { if (remoteLogManager.isDefined) remoteLogManager @@ -4408,6 +4425,138 @@ class ReplicaManagerTest { assertEquals(followerPartitions, actualFollowerPartitions) } + @ParameterizedTest + @ValueSource(booleans = Array(true, false)) + def testApplyDeltaShouldHandleReplicaAssignedToOnlineDirectory(enableRemoteStorage: Boolean): Unit = { + val localId = 1 + val topicPartition0 = new TopicPartition("foo", 0) + val topicPartition1 = new TopicPartition("foo", 1) + val directoryEventHandler = mock(classOf[DirectoryEventHandler]) + + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId, + enableRemoteStorage = enableRemoteStorage, setupLogDirMetaProperties = true, directoryEventHandler = directoryEventHandler) + + try { + + // Test applying delta as leader + val directoryIds = replicaManager.logManager.directoryIdsSet.toList + // Make the local replica the leader + val leaderTopicsDelta = topicsCreateDelta(localId, true, partition = 0, directoryIds = directoryIds) + val leaderMetadataImage = imageFromTopics(leaderTopicsDelta.apply()) + replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage) + + // Check the broker shouldn't updated the controller with the correct assignment. + verifyNoInteractions(replicaManager.directoryEventHandler) + val logDirIdHostingPartition0 = replicaManager.logManager.directoryId(replicaManager.logManager.getLog(topicPartition0).get.dir.getParent).get + assertEquals(directoryIds.head, logDirIdHostingPartition0) + + // Test applying delta as follower + val followerTopicsDelta = topicsCreateDelta(localId, false, partition = 1, directoryIds = directoryIds) + val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply()) + + replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage) + + // Check the broker shouldn't updated the controller with the correct assignment. + verifyNoInteractions(replicaManager.directoryEventHandler) + val logDirIdHostingPartition1 = replicaManager.logManager.directoryId(replicaManager.logManager.getLog(topicPartition1).get.dir.getParent).get + assertEquals(directoryIds.head, logDirIdHostingPartition1) + + } finally { + replicaManager.shutdown(checkpointHW = false) + } + } + + @ParameterizedTest + @ValueSource(booleans = Array(true, false)) + def testApplyDeltaShouldHandleReplicaAssignedToUnassignedDirectory(enableRemoteStorage: Boolean): Unit = { + val localId = 1 + val topicPartition0 = new TopicPartition("foo", 0) + val topicPartition1 = new TopicPartition("foo", 1) + val directoryEventHandler = mock(classOf[DirectoryEventHandler]) + + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId, + enableRemoteStorage = enableRemoteStorage, setupLogDirMetaProperties = true, directoryEventHandler = directoryEventHandler) + + try { + // Make the local replica the leader + val leaderTopicsDelta = topicsCreateDelta(localId, true, partition = 0, directoryIds = List(DirectoryId.UNASSIGNED, DirectoryId.UNASSIGNED)) + val leaderMetadataImage = imageFromTopics(leaderTopicsDelta.apply()) + val topicId = leaderMetadataImage.topics().topicsByName.get("foo").id + val topicIdPartition0 = new TopicIdPartition(topicId, topicPartition0) + val topicIdPartition1 = new TopicIdPartition(topicId, topicPartition1) + replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage) + + // Make the local replica the as follower + val followerTopicsDelta = topicsCreateDelta(localId, false, partition = 1, directoryIds = List(DirectoryId.UNASSIGNED, DirectoryId.UNASSIGNED)) + val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply()) + replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage) + + // Check the broker updated the controller with the correct assignment. + val topicIdPartitionCapture: ArgumentCaptor[org.apache.kafka.server.common.TopicIdPartition] = + ArgumentCaptor.forClass(classOf[org.apache.kafka.server.common.TopicIdPartition]) + val logIdCaptureForPartition: ArgumentCaptor[Uuid] = ArgumentCaptor.forClass(classOf[Uuid]) + verify(replicaManager.directoryEventHandler, atLeastOnce()).handleAssignment(topicIdPartitionCapture.capture(), logIdCaptureForPartition.capture(), any()) + + assertEquals(topicIdPartitionCapture.getAllValues.asScala, + List( + new org.apache.kafka.server.common.TopicIdPartition(topicId, topicIdPartition0.partition()), + new org.apache.kafka.server.common.TopicIdPartition(topicId, topicIdPartition1.partition()) + ) + ) + val logDirIdHostingPartition0 = replicaManager.logManager.directoryId(replicaManager.logManager.getLog(topicPartition0).get.dir.getParent).get + val logDirIdHostingPartition1 = replicaManager.logManager.directoryId(replicaManager.logManager.getLog(topicPartition1).get.dir.getParent).get + assertEquals(logIdCaptureForPartition.getAllValues.asScala, List(logDirIdHostingPartition0, logDirIdHostingPartition1)) + } finally { + replicaManager.shutdown(checkpointHW = false) + } + } + + @ParameterizedTest + @ValueSource(booleans = Array(true, false)) + def testApplyDeltaShouldHandleReplicaAssignedToLostDirectory(enableRemoteStorage: Boolean): Unit = { + val localId = 1 + val topicPartition0 = new TopicPartition("foo", 0) + val topicPartition1 = new TopicPartition("foo", 1) + val directoryEventHandler = mock(classOf[DirectoryEventHandler]) + + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId, + enableRemoteStorage = enableRemoteStorage, setupLogDirMetaProperties = true, directoryEventHandler = directoryEventHandler) + + try { + // Make the local replica the leader + val leaderTopicsDelta = topicsCreateDelta(localId, true, directoryIds = List(DirectoryId.LOST, DirectoryId.LOST)) + val leaderMetadataImage = imageFromTopics(leaderTopicsDelta.apply()) + val topicId = leaderMetadataImage.topics().topicsByName.get("foo").id + val topicIdPartition0 = new TopicIdPartition(topicId, topicPartition0) + val topicIdPartition1 = new TopicIdPartition(topicId, topicPartition1) + replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage) + + // Make the local replica the as follower + val followerTopicsDelta = topicsCreateDelta(localId, false, partition = 1, directoryIds = List(DirectoryId.LOST, DirectoryId.LOST)) + val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply()) + replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage) + + // Check the broker updated the controller with the correct assignment. + val topicIdPartitionCapture: ArgumentCaptor[org.apache.kafka.server.common.TopicIdPartition] = + ArgumentCaptor.forClass(classOf[org.apache.kafka.server.common.TopicIdPartition]) + val logIdCaptureForPartition: ArgumentCaptor[Uuid] = ArgumentCaptor.forClass(classOf[Uuid]) + verify(replicaManager.directoryEventHandler, atLeastOnce()).handleAssignment(topicIdPartitionCapture.capture(), logIdCaptureForPartition.capture(), any()) + + assertEquals(topicIdPartitionCapture.getAllValues.asScala, + List( + new org.apache.kafka.server.common.TopicIdPartition(topicId, topicIdPartition0.partition()), + new org.apache.kafka.server.common.TopicIdPartition(topicId, topicIdPartition1.partition()) + ) + ) + val logDirIdHostingPartition0 = replicaManager.logManager.directoryId(replicaManager.logManager.getLog(topicPartition0).get.dir.getParent).get + val logDirIdHostingPartition1 = replicaManager.logManager.directoryId(replicaManager.logManager.getLog(topicPartition1).get.dir.getParent).get + assertEquals(logIdCaptureForPartition.getAllValues.asScala, List(logDirIdHostingPartition0, logDirIdHostingPartition1)) + + } finally { + replicaManager.shutdown(checkpointHW = false) + } + } + @ParameterizedTest @ValueSource(booleans = Array(true, false)) def testDeltaFromLeaderToFollower(enableRemoteStorage: Boolean): Unit = { @@ -5140,17 +5289,7 @@ class ReplicaManagerTest { // Make the local replica the follower. var followerTopicsDelta = new TopicsDelta(TopicsImage.EMPTY) followerTopicsDelta.replay(new TopicRecord().setName("foo").setTopicId(FOO_UUID)) - followerTopicsDelta.replay(new PartitionRecord() - .setPartitionId(0) - .setTopicId(FOO_UUID) - .setReplicas(util.Arrays.asList(localId, localId + 1)) - .setIsr(util.Arrays.asList(localId, localId + 1)) - .setRemovingReplicas(Collections.emptyList()) - .setAddingReplicas(Collections.emptyList()) - .setLeader(localId + 1) - .setLeaderEpoch(0) - .setPartitionEpoch(0) - ) + followerTopicsDelta.replay(partitionRecord(localId, localId + 1)) var followerMetadataImage = imageFromTopics(followerTopicsDelta.apply()) replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage) @@ -5477,40 +5616,48 @@ class ReplicaManagerTest { } } - private def topicsCreateDelta(startId: Int, isStartIdLeader: Boolean): TopicsDelta = { + private def topicsCreateDelta(startId: Int, isStartIdLeader: Boolean, partition:Int = 0, directoryIds: List[Uuid] = List.empty): TopicsDelta = { val leader = if (isStartIdLeader) startId else startId + 1 val delta = new TopicsDelta(TopicsImage.EMPTY) delta.replay(new TopicRecord().setName("foo").setTopicId(FOO_UUID)) - delta.replay( - new PartitionRecord() - .setPartitionId(0) - .setTopicId(FOO_UUID) - .setReplicas(util.Arrays.asList(startId, startId + 1)) - .setIsr(util.Arrays.asList(startId, startId + 1)) - .setRemovingReplicas(Collections.emptyList()) - .setAddingReplicas(Collections.emptyList()) - .setLeader(leader) - .setLeaderEpoch(0) - .setPartitionEpoch(0) - ) + val record = partitionRecord(startId, leader, partition) + if (!directoryIds.isEmpty) { + record.setDirectories(directoryIds.asJava) + } + delta.replay(record) delta } + private def partitionRecord(startId: Int, leader: Int, partition: Int = 0) = { + new PartitionRecord() + .setPartitionId(partition) + .setTopicId(FOO_UUID) + .setReplicas(util.Arrays.asList(startId, startId + 1)) + .setIsr(util.Arrays.asList(startId, startId + 1)) + .setRemovingReplicas(Collections.emptyList()) + .setAddingReplicas(Collections.emptyList()) + .setLeader(leader) + .setLeaderEpoch(0) + .setPartitionEpoch(0) + } + private def topicsChangeDelta(topicsImage: TopicsImage, startId: Int, isStartIdLeader: Boolean): TopicsDelta = { val leader = if (isStartIdLeader) startId else startId + 1 val delta = new TopicsDelta(topicsImage) - delta.replay( - new PartitionChangeRecord() - .setPartitionId(0) - .setTopicId(FOO_UUID) - .setReplicas(util.Arrays.asList(startId, startId + 1)) - .setIsr(util.Arrays.asList(startId, startId + 1)) - .setLeader(leader) - ) + delta.replay(partitionChangeRecord(startId, leader)) delta } + private def partitionChangeRecord(startId: Int, leader: Int) = { + new PartitionChangeRecord() + .setPartitionId(0) + .setTopicId(FOO_UUID) + .setReplicas(util.Arrays.asList(startId, startId + 1)) + .setIsr(util.Arrays.asList(startId, startId + 1)) + .setLeader(leader) + } + private def topicsDeleteDelta(topicsImage: TopicsImage): TopicsDelta = { val delta = new TopicsDelta(topicsImage) delta.replay(new RemoveTopicRecord().setTopicId(FOO_UUID)) diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala index bdf3d0afc9c..81292bd4801 100755 --- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala @@ -149,7 +149,7 @@ class ServerStartupTest extends QuorumTestHarness { } server = new KafkaServer(KafkaConfig.fromProps(props)) server.startup() - assertEquals(!migrationEnabled, server.logManager.directoryIds.isEmpty) + assertEquals(!migrationEnabled, server.logManager.directoryIdsSet.isEmpty) server.shutdown() } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 913298b719a..7887753a9d1 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -25,7 +25,7 @@ import java.nio.file.{Files, StandardOpenOption} import java.security.cert.X509Certificate import java.time.Duration import java.util -import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference} +import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} import java.util.concurrent.{Callable, CompletableFuture, ExecutionException, Executors, TimeUnit} import java.util.{Arrays, Collections, Optional, Properties} import com.yammer.metrics.core.{Gauge, Histogram, Meter} @@ -1471,7 +1471,7 @@ object TestUtils extends Logging { if (log.isDefined) { val spyLogManager = Mockito.spy(logManager) - Mockito.doReturn(log.get, Nil: _*).when(spyLogManager).getOrCreateLog(any(classOf[TopicPartition]), anyBoolean(), anyBoolean(), any(classOf[Option[Uuid]])) + Mockito.doReturn(log.get, Nil: _*).when(spyLogManager).getOrCreateLog(any(classOf[TopicPartition]), anyBoolean(), anyBoolean(), any(classOf[Option[Uuid]]), any(classOf[Option[Uuid]])) spyLogManager } else logManager @@ -1529,7 +1529,6 @@ object TestUtils extends Logging { val expands: AtomicInteger = new AtomicInteger(0) val shrinks: AtomicInteger = new AtomicInteger(0) val failures: AtomicInteger = new AtomicInteger(0) - val directory: AtomicReference[String] = new AtomicReference[String]() override def markIsrExpand(): Unit = expands.incrementAndGet() @@ -1537,13 +1536,11 @@ object TestUtils extends Logging { override def markFailed(): Unit = failures.incrementAndGet() - override def assignDir(dir: String): Unit = directory.set(dir) def reset(): Unit = { expands.set(0) shrinks.set(0) failures.set(0) - directory.set(null) } } diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java index 0cdca1e3c4c..7a91a611a1a 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java @@ -182,7 +182,7 @@ public class ReplicaFetcherThreadBenchmark { 0, () -> -1, Time.SYSTEM, alterPartitionListener, new DelayedOperationsMock(tp), Mockito.mock(MetadataCache.class), logManager, isrChannelManager); - partition.makeFollower(partitionState, offsetCheckpoints, topicId); + partition.makeFollower(partitionState, offsetCheckpoints, topicId, Option.empty()); pool.put(tp, partition); initialFetchStates.put(tp, new InitialFetchState(topicId, new BrokerEndPoint(3, "host", 3000), 0, 0)); BaseRecords fetched = new BaseRecords() { diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java index cb8554083a7..b9f091a9a7c 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java @@ -125,7 +125,7 @@ public class PartitionMakeFollowerBenchmark { MetadataVersion.latest(), 0, () -> -1, Time.SYSTEM, alterPartitionListener, delayedOperations, Mockito.mock(MetadataCache.class), logManager, alterPartitionManager); - partition.createLogIfNotExists(true, false, offsetCheckpoints, topicId); + partition.createLogIfNotExists(true, false, offsetCheckpoints, topicId, Option.empty()); executorService.submit((Runnable) () -> { SimpleRecord[] simpleRecords = new SimpleRecord[] { new SimpleRecord(1L, "foo".getBytes(StandardCharsets.UTF_8), "1".getBytes(StandardCharsets.UTF_8)), @@ -158,6 +158,6 @@ public class PartitionMakeFollowerBenchmark { .setPartitionEpoch(1) .setReplicas(replicas) .setIsNew(true); - return partition.makeFollower(partitionState, offsetCheckpoints, topicId); + return partition.makeFollower(partitionState, offsetCheckpoints, topicId, Option.empty()); } } diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java index 0ebc3de2ff7..48befe4be4d 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java @@ -128,7 +128,7 @@ public class UpdateFollowerFetchStateBenchmark { MetadataVersion.latest(), 0, () -> -1, Time.SYSTEM, alterPartitionListener, delayedOperations, Mockito.mock(MetadataCache.class), logManager, alterPartitionManager); - partition.makeLeader(partitionState, offsetCheckpoints, topicId); + partition.makeLeader(partitionState, offsetCheckpoints, topicId, Option.empty()); replica1 = partition.getReplica(1).get(); replica2 = partition.getReplica(2).get(); } diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java index 67f6e43a826..0456ecda142 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java @@ -147,7 +147,7 @@ public class CheckpointBench { OffsetCheckpoints checkpoints = (logDir, topicPartition) -> Option.apply(0L); for (TopicPartition topicPartition : topicPartitions) { final Partition partition = this.replicaManager.createPartition(topicPartition); - partition.createLogIfNotExists(true, false, checkpoints, Option.apply(Uuid.randomUuid())); + partition.createLogIfNotExists(true, false, checkpoints, Option.apply(Uuid.randomUuid()), Option.empty()); } replicaManager.checkpointHighWatermarks(); diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java index 6f082581099..8972e44b42e 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java @@ -218,7 +218,7 @@ public class PartitionCreationBench { .setReplicas(replicas) .setIsNew(true); - partition.makeFollower(partitionState, checkpoints, topicId); + partition.makeFollower(partitionState, checkpoints, topicId, Option.empty()); } } } diff --git a/metadata/src/main/java/org/apache/kafka/image/LocalReplicaChanges.java b/metadata/src/main/java/org/apache/kafka/image/LocalReplicaChanges.java index 0fe8e3ef138..b0f93a21de1 100644 --- a/metadata/src/main/java/org/apache/kafka/image/LocalReplicaChanges.java +++ b/metadata/src/main/java/org/apache/kafka/image/LocalReplicaChanges.java @@ -17,6 +17,7 @@ package org.apache.kafka.image; +import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.metadata.PartitionRegistration; @@ -30,17 +31,20 @@ public final class LocalReplicaChanges { private final Map<TopicPartition, PartitionInfo> followers; // The topic name -> topic id map in leaders and followers changes private final Map<String, Uuid> topicIds; + private final Map<TopicIdPartition, Uuid> directoryIds; LocalReplicaChanges( Set<TopicPartition> deletes, Map<TopicPartition, PartitionInfo> leaders, Map<TopicPartition, PartitionInfo> followers, - Map<String, Uuid> topicIds + Map<String, Uuid> topicIds, + Map<TopicIdPartition, Uuid> directoryIds ) { this.deletes = deletes; this.leaders = leaders; this.followers = followers; this.topicIds = topicIds; + this.directoryIds = directoryIds; } public Set<TopicPartition> deletes() { @@ -69,6 +73,10 @@ public final class LocalReplicaChanges { ); } + public Map<TopicIdPartition, Uuid> directoryIds() { + return directoryIds; + } + public static final class PartitionInfo { private final Uuid topicId; private final PartitionRegistration partition; diff --git a/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java b/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java index c0543b7de4d..4983afdb989 100644 --- a/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java @@ -17,6 +17,7 @@ package org.apache.kafka.image; +import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metadata.PartitionChangeRecord; @@ -131,6 +132,7 @@ public final class TopicDelta { Map<TopicPartition, LocalReplicaChanges.PartitionInfo> leaders = new HashMap<>(); Map<TopicPartition, LocalReplicaChanges.PartitionInfo> followers = new HashMap<>(); Map<String, Uuid> topicIds = new HashMap<>(); + Map<TopicIdPartition, Uuid> directoryIds = new HashMap<>(); for (Entry<Integer, PartitionRegistration> entry : partitionChanges.entrySet()) { if (!Replicas.contains(entry.getValue().replicas, brokerId)) { @@ -160,9 +162,25 @@ public final class TopicDelta { topicIds.putIfAbsent(name(), id()); } } + + try { + PartitionRegistration prevPartition = image.partitions().get(entry.getKey()); + if ( + prevPartition == null || + prevPartition.directories == null || + prevPartition.directory(brokerId) != entry.getValue().directory(brokerId) + ) { + directoryIds.put( + new TopicIdPartition(id(), new TopicPartition(name(), entry.getKey())), + entry.getValue().directory(brokerId) + ); + } + } catch (IllegalArgumentException e) { + // Do nothing if broker isn't part of the replica set. + } } - return new LocalReplicaChanges(deletes, leaders, followers, topicIds); + return new LocalReplicaChanges(deletes, leaders, followers, topicIds, directoryIds); } @Override diff --git a/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java b/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java index ce65e16a127..2faa1f930d0 100644 --- a/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java @@ -17,6 +17,7 @@ package org.apache.kafka.image; +import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.metadata.PartitionChangeRecord; @@ -191,6 +192,7 @@ public final class TopicsDelta { Map<TopicPartition, LocalReplicaChanges.PartitionInfo> leaders = new HashMap<>(); Map<TopicPartition, LocalReplicaChanges.PartitionInfo> followers = new HashMap<>(); Map<String, Uuid> topicIds = new HashMap<>(); + Map<TopicIdPartition, Uuid> directoryIds = new HashMap<>(); for (TopicDelta delta : changedTopics.values()) { LocalReplicaChanges changes = delta.localChanges(brokerId); @@ -199,6 +201,7 @@ public final class TopicsDelta { leaders.putAll(changes.leaders()); followers.putAll(changes.followers()); topicIds.putAll(changes.topicIds()); + directoryIds.putAll(changes.directoryIds()); } // Add all of the removed topic partitions to the set of locally removed partitions @@ -211,7 +214,7 @@ public final class TopicsDelta { }); }); - return new LocalReplicaChanges(deletes, leaders, followers, topicIds); + return new LocalReplicaChanges(deletes, leaders, followers, topicIds, directoryIds); } @Override