Repository: kafka Updated Branches: refs/heads/trunk 1503f7603 -> 497e669dd
KAFKA-1911; Async delete topic - contributed by Mayuresh Gharat <gharatmayures...@gmail.com> and Sumant Tambe <suta...@yahoo.com> The last patch submitted by MayureshGharat (back in Dec 15) has been rebased to the latest trunk. I took care of a couple of test failures (MetricsTest) along the way. jjkoshy , granders , avianey , you may be interested in this PR. Author: Sumant Tambe <suta...@yahoo.com> Author: Mayuresh Gharat <mgha...@mgharat-ld1.linkedin.biz> Author: MayureshGharat <gharatmayures...@gmail.com> Reviewers: Joel Koshy <jjkosh...@gmail.com> Closes #1664 from sutambe/async-delete-topic Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/497e669d Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/497e669d Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/497e669d Branch: refs/heads/trunk Commit: 497e669dd806fc984f5dceaede4a1f40f4e77c48 Parents: 1503f76 Author: Mayuresh Gharat <gharatmayures...@gmail.com> Authored: Wed Nov 30 10:40:31 2016 -0800 Committer: Joel Koshy <jjko...@gmail.com> Committed: Wed Nov 30 10:40:31 2016 -0800 ---------------------------------------------------------------------- .../main/scala/kafka/cluster/Partition.scala | 5 +- .../main/scala/kafka/log/AbstractIndex.scala | 23 ++--- core/src/main/scala/kafka/log/Log.scala | 31 ++++--- core/src/main/scala/kafka/log/LogManager.scala | 91 ++++++++++++++++---- .../scala/kafka/server/ReplicaManager.scala | 17 ++-- 5 files changed, 114 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/497e669d/core/src/main/scala/kafka/cluster/Partition.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 4d3fb56..44d6a77 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -144,12 +144,13 @@ class Partition(val topic: String, assignedReplicaMap.clear() inSyncReplicas = Set.empty[Replica] leaderReplicaIdOpt = None + val topicPartition = TopicAndPartition(topic, partitionId) try { - logManager.deleteLog(TopicAndPartition(topic, partitionId)) + logManager.asyncDelete(topicPartition) removePartitionMetrics() } catch { case e: IOException => - fatal("Error deleting the log for partition [%s,%d]".format(topic, partitionId), e) + fatal(s"Error deleting the log for partition $topicPartition", e) Runtime.getRuntime().halt(1) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/497e669d/core/src/main/scala/kafka/log/AbstractIndex.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala b/core/src/main/scala/kafka/log/AbstractIndex.scala index d594f18..77ef0f7 100644 --- a/core/src/main/scala/kafka/log/AbstractIndex.scala +++ b/core/src/main/scala/kafka/log/AbstractIndex.scala @@ -33,11 +33,11 @@ import scala.math.ceil /** * The abstract index class which holds entry format agnostic methods. * - * @param _file The index file + * @param file The index file * @param baseOffset the base offset of the segment that this index is corresponding to. * @param maxIndexSize The maximum index size in bytes. */ -abstract class AbstractIndex[K, V](@volatile private[this] var _file: File, val baseOffset: Long, val maxIndexSize: Int = -1) +abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Long, val maxIndexSize: Int = -1) extends Logging { protected def entrySize: Int @@ -46,8 +46,8 @@ abstract class AbstractIndex[K, V](@volatile private[this] var _file: File, val @volatile protected var mmap: MappedByteBuffer = { - val newlyCreated = _file.createNewFile() - val raf = new RandomAccessFile(_file, "rw") + val newlyCreated = file.createNewFile() + val raf = new RandomAccessFile(file, "rw") try { /* pre-allocate the file if necessary */ if(newlyCreated) { @@ -92,11 +92,6 @@ abstract class AbstractIndex[K, V](@volatile private[this] var _file: File, val def entries: Int = _entries /** - * The index file - */ - def file: File = _file - - /** * Reset the size of the memory map and the underneath file. This is used in two kinds of cases: (1) in * trimToValidSize() which is called at closing the segment or new segment being rolled; (2) at * loading segments from disk or truncating back to an old segment where a new log segment became active; @@ -104,7 +99,7 @@ abstract class AbstractIndex[K, V](@volatile private[this] var _file: File, val */ def resize(newSize: Int) { inLock(lock) { - val raf = new RandomAccessFile(_file, "rw") + val raf = new RandomAccessFile(file, "rw") val roundedNewSize = roundDownToExactMultiple(newSize, entrySize) val position = mmap.position @@ -128,8 +123,8 @@ abstract class AbstractIndex[K, V](@volatile private[this] var _file: File, val * @throws IOException if rename fails */ def renameTo(f: File) { - try Utils.atomicMoveWithFallback(_file.toPath, f.toPath) - finally _file = f + try Utils.atomicMoveWithFallback(file.toPath, f.toPath) + finally file = f } /** @@ -145,10 +140,10 @@ abstract class AbstractIndex[K, V](@volatile private[this] var _file: File, val * Delete this index file */ def delete(): Boolean = { - info(s"Deleting index ${_file.getAbsolutePath}") + info(s"Deleting index ${file.getAbsolutePath}") if(Os.isWindows) CoreUtils.swallow(forceUnmap(mmap)) - _file.delete() + file.delete() } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/497e669d/core/src/main/scala/kafka/log/Log.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 4c286d2..9e3dfac 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -83,7 +83,7 @@ case class LogAppendInfo(var firstOffset: Long, * */ @threadsafe -class Log(val dir: File, +class Log(@volatile var dir: File, @volatile var config: LogConfig, @volatile var recoveryPoint: Long = 0L, scheduler: Scheduler, @@ -222,7 +222,6 @@ class Log(val dir: File, error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath)) segment.recover(config.maxMessageSize) } - segments.put(start, segment) } } @@ -263,12 +262,13 @@ class Log(val dir: File, initFileSize = this.initFileSize(), preallocate = config.preallocate)) } else { - recoverLog() - // reset the index size of the currently active log segment to allow more entries - activeSegment.index.resize(config.maxIndexSize) - activeSegment.timeIndex.resize(config.maxIndexSize) + if (!dir.getAbsolutePath.endsWith(Log.DeleteDirSuffix)) { + recoverLog() + // reset the index size of the currently active log segment to allow more entries + activeSegment.index.resize(config.maxIndexSize) + activeSegment.timeIndex.resize(config.maxIndexSize) + } } - } private def updateLogEndOffset(messageOffset: Long) { @@ -833,7 +833,6 @@ class Log(val dir: File, */ private[log] def delete() { lock synchronized { - removeLogMetrics() logSegments.foreach(_.delete()) segments.clear() Utils.delete(dir) @@ -1046,6 +1045,9 @@ object Log { /** TODO: Get rid of CleanShutdownFile in 0.8.2 */ val CleanShutdownFile = ".kafka_cleanshutdown" + /** a directory that is scheduled to be deleted */ + val DeleteDirSuffix = "-delete" + /** * Make log segment file name from offset bytes. All this does is pad out the offset number with zeros * so that ls sorts the files numerically. @@ -1092,10 +1094,18 @@ object Log { * Parse the topic and partition out of the directory name of a log */ def parseTopicPartitionName(dir: File): TopicAndPartition = { - val name: String = dir.getName - if (name == null || name.isEmpty || !name.contains('-')) { + val dirName = dir.getName + if (dirName == null || dirName.isEmpty || !dirName.contains('-')) { throwException(dir) } + + val name: String = + if (dirName.endsWith(DeleteDirSuffix)) { + dirName.substring(0, dirName.indexOf('.')) + } else { + dirName + } + val index = name.lastIndexOf('-') val topic: String = name.substring(0, index) val partition: String = name.substring(index + 1) @@ -1105,6 +1115,7 @@ object Log { TopicAndPartition(topic, partition.toInt) } + def throwException(dir: File) { throw new KafkaException("Found directory " + dir.getCanonicalPath + ", " + "'" + dir.getName + "' is not in the form of topic-partition\n" + http://git-wip-us.apache.org/repos/asf/kafka/blob/497e669d/core/src/main/scala/kafka/log/LogManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index a5beb49..64b277a 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -18,13 +18,13 @@ package kafka.log import java.io._ -import java.util.concurrent.TimeUnit +import java.util.concurrent.{LinkedBlockingQueue, TimeUnit} import kafka.utils._ import scala.collection._ import scala.collection.JavaConverters._ -import kafka.common.{KafkaException, TopicAndPartition} +import kafka.common.{KafkaStorageException, KafkaException, TopicAndPartition} import kafka.server.{BrokerState, OffsetCheckpoint, RecoveringFromUncleanShutdown} import java.util.concurrent.{ExecutionException, ExecutorService, Executors, Future} @@ -53,8 +53,10 @@ class LogManager(val logDirs: Array[File], val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint" val LockFile = ".lock" val InitialTaskDelayMs = 30*1000 + private val logCreationOrDeletionLock = new Object private val logs = new Pool[TopicAndPartition, Log]() + private val logsToBeDeleted = new LinkedBlockingQueue[Log]() createAndValidateLogDirs(logDirs) private val dirLocks = lockLogDirs(logDirs) @@ -150,12 +152,15 @@ class LogManager(val logDirs: Array[File], val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L) val current = new Log(logDir, config, logRecoveryPoint, scheduler, time) - val previous = this.logs.put(topicPartition, current) - - if (previous != null) { - throw new IllegalArgumentException( - "Duplicate log directories found: %s, %s!".format( - current.dir.getAbsolutePath, previous.dir.getAbsolutePath)) + if (logDir.getName.endsWith(Log.DeleteDirSuffix)) { + this.logsToBeDeleted.add(current) + } else { + val previous = this.logs.put(topicPartition, current) + if (previous != null) { + throw new IllegalArgumentException( + "Duplicate log directories found: %s, %s!".format( + current.dir.getAbsolutePath, previous.dir.getAbsolutePath)) + } } } } @@ -204,6 +209,11 @@ class LogManager(val logDirs: Array[File], delay = InitialTaskDelayMs, period = flushCheckpointMs, TimeUnit.MILLISECONDS) + scheduler.schedule("kafka-delete-logs", + deleteLogs, + delay = InitialTaskDelayMs, + period = defaultConfig.fileDeleteDelayMs, + TimeUnit.MILLISECONDS) } if(cleanerConfig.enableCleaner) cleaner.startup() @@ -376,24 +386,69 @@ class LogManager(val logDirs: Array[File], } /** - * Delete a log. + * Delete logs marked for deletion. */ - def deleteLog(topicAndPartition: TopicAndPartition) { - var removedLog: Log = null - logCreationOrDeletionLock synchronized { - removedLog = logs.remove(topicAndPartition) + private def deleteLogs(): Unit = { + try { + var failed = 0 + while (!logsToBeDeleted.isEmpty && failed < logsToBeDeleted.size()) { + val removedLog = logsToBeDeleted.take() + if (removedLog != null) { + try { + removedLog.delete() + info(s"Deleted log for partition ${removedLog.topicAndPartition} in ${removedLog.dir.getAbsolutePath}.") + } catch { + case e: Throwable => + error(s"Exception in deleting $removedLog. Moving it to the end of the queue.", e) + failed = failed + 1 + logsToBeDeleted.put(removedLog) + } + } + } + } catch { + case e: Throwable => + error(s"Exception in kafka-delete-logs thread.", e) } +} + + /** + * Rename the directory of the given topic-partition "logdir" as "logdir.uuid.delete" and + * add it in the queue for deletion. + * @param topicAndPartition TopicPartition that needs to be deleted + */ + def asyncDelete(topicAndPartition: TopicAndPartition) = { + val removedLog: Log = logCreationOrDeletionLock synchronized { + logs.remove(topicAndPartition) + } if (removedLog != null) { //We need to wait until there is no more cleaning task on the log to be deleted before actually deleting it. if (cleaner != null) { cleaner.abortCleaning(topicAndPartition) cleaner.updateCheckpoints(removedLog.dir.getParentFile) } - removedLog.delete() - info("Deleted log for partition [%s,%d] in %s." - .format(topicAndPartition.topic, - topicAndPartition.partition, - removedLog.dir.getAbsolutePath)) + // renaming the directory to topic-partition.uniqueId-delete + val dirName = new StringBuilder(removedLog.name) + .append(".") + .append(java.util.UUID.randomUUID.toString.replaceAll("-","")) + .append(Log.DeleteDirSuffix) + .toString() + removedLog.close() + val renamedDir = new File(removedLog.dir.getParent, dirName) + val renameSuccessful = removedLog.dir.renameTo(renamedDir) + if (renameSuccessful) { + removedLog.dir = renamedDir + // change the file pointers for log and index file + for (logSegment <- removedLog.logSegments) { + logSegment.log.file = new File(renamedDir, logSegment.log.file.getName) + logSegment.index.file = new File(renamedDir, logSegment.index.file.getName) + } + + logsToBeDeleted.add(removedLog) + removedLog.removeLogMetrics() + info(s"Log for partition ${removedLog.topicAndPartition} is renamed to ${removedLog.dir.getAbsolutePath} and is scheduled for deletion") + } else { + throw new KafkaStorageException("Failed to rename log directory from " + removedLog.dir.getAbsolutePath + " to " + renamedDir.getAbsolutePath) + } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/497e669d/core/src/main/scala/kafka/server/ReplicaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index febe8ad..d2ec200 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -219,10 +219,11 @@ class ReplicaManager(val config: KafkaConfig, scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges, period = 2500L, unit = TimeUnit.MILLISECONDS) } - def stopReplica(topic: String, partitionId: Int, deletePartition: Boolean): Short = { - stateChangeLogger.trace("Broker %d handling stop replica (delete=%s) for partition [%s,%d]".format(localBrokerId, - deletePartition.toString, topic, partitionId)) + def stopReplica(topicPartition: TopicPartition, deletePartition: Boolean): Short = { + stateChangeLogger.trace(s"Broker $localBrokerId handling stop replica (delete=$deletePartition) for partition $topicPartition") val errorCode = Errors.NONE.code + val topic = topicPartition.topic + val partitionId = topicPartition.partition getPartition(topic, partitionId) match { case Some(_) => if (deletePartition) { @@ -241,14 +242,12 @@ class ReplicaManager(val config: KafkaConfig, val topicAndPartition = TopicAndPartition(topic, partitionId) if(logManager.getLog(topicAndPartition).isDefined) { - logManager.deleteLog(topicAndPartition) + logManager.asyncDelete(topicAndPartition) } } - stateChangeLogger.trace("Broker %d ignoring stop replica (delete=%s) for partition [%s,%d] as replica doesn't exist on broker" - .format(localBrokerId, deletePartition, topic, partitionId)) + stateChangeLogger.trace(s"Broker $localBrokerId ignoring stop replica (delete=$deletePartition) for partition $topicPartition as replica doesn't exist on broker") } - stateChangeLogger.trace("Broker %d finished handling stop replica (delete=%s) for partition [%s,%d]" - .format(localBrokerId, deletePartition, topic, partitionId)) + stateChangeLogger.trace(s"Broker $localBrokerId finished handling stop replica (delete=$deletePartition) for partition $topicPartition") errorCode } @@ -265,7 +264,7 @@ class ReplicaManager(val config: KafkaConfig, // First stop fetchers for all partitions, then stop the corresponding replicas replicaFetcherManager.removeFetcherForPartitions(partitions) for (topicPartition <- partitions){ - val errorCode = stopReplica(topicPartition.topic, topicPartition.partition, stopReplicaRequest.deletePartitions) + val errorCode = stopReplica(topicPartition, stopReplicaRequest.deletePartitions) responseMap.put(topicPartition, errorCode) } (responseMap, Errors.NONE.code)