Repository: kafka Updated Branches: refs/heads/trunk 786867c2e -> 503bd3664
KAFKA-2436; log.retention.hours should be honored by LogManager Author: Dong Lin <lindon...@gmail.com> Reviewers: Joel Koshy, Gwen Shapira Closes #142 from lindong28/KAFKA-2436 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/503bd366 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/503bd366 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/503bd366 Branch: refs/heads/trunk Commit: 503bd36647695e8cc91893ffb80346dd03eb0bc5 Parents: 786867c Author: Dong Lin <lindon...@gmail.com> Authored: Tue Aug 18 13:03:11 2015 -0700 Committer: Gwen Shapira <csh...@gmail.com> Committed: Tue Aug 18 13:03:11 2015 -0700 ---------------------------------------------------------------------- core/src/main/scala/kafka/log/LogConfig.scala | 5 +- .../main/scala/kafka/server/KafkaConfig.scala | 8 +-- .../main/scala/kafka/server/KafkaServer.scala | 65 ++++++++++---------- .../scala/unit/kafka/log/LogConfigTest.scala | 19 ++++++ 4 files changed, 58 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/503bd366/core/src/main/scala/kafka/log/LogConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index c969d16..7fc7e33 100755 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -47,7 +47,10 @@ object Defaults { } case class LogConfig(props: java.util.Map[_, _]) extends AbstractConfig(LogConfig.configDef, props, false) { - + /** + * Important note: Any configuration parameter that is passed along from KafkaConfig to LogConfig + * should also go in copyKafkaConfigToLog. + */ val segmentSize = getInt(LogConfig.SegmentBytesProp) val segmentMs = getLong(LogConfig.SegmentMsProp) val segmentJitterMs = getLong(LogConfig.SegmentJitterMsProp) http://git-wip-us.apache.org/repos/asf/kafka/blob/503bd366/core/src/main/scala/kafka/server/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 394f21b..c39402c 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -335,7 +335,7 @@ object KafkaConfig { val LogRetentionTimeHoursDoc = "The number of hours to keep a log file before deleting it (in hours), tertiary to " + LogRetentionTimeMillisProp + " property" val LogRetentionBytesDoc = "The maximum size of the log before deleting it" - val LogCleanupIntervalMsDoc = "The frequency in minutes that the log cleaner checks whether any log is eligible for deletion" + val LogCleanupIntervalMsDoc = "The frequency in milliseconds that the log cleaner checks whether any log is eligible for deletion" val LogCleanupPolicyDoc = "The default cleanup policy for segments beyond the retention window, must be either \"delete\" or \"compact\"" val LogCleanerThreadsDoc = "The number of background threads to use for log cleaning" val LogCleanerIoMaxBytesPerSecondDoc = "The log cleaner will be throttled so that the sum of its read and write i/o will be less than this value on average" @@ -652,8 +652,9 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka val logRollTimeMillis: java.lang.Long = Option(getLong(KafkaConfig.LogRollTimeMillisProp)).getOrElse(60 * 60 * 1000L * getInt(KafkaConfig.LogRollTimeHoursProp)) val logRollTimeJitterMillis: java.lang.Long = Option(getLong(KafkaConfig.LogRollTimeJitterMillisProp)).getOrElse(60 * 60 * 1000L * getInt(KafkaConfig.LogRollTimeJitterHoursProp)) val logFlushIntervalMs: java.lang.Long = Option(getLong(KafkaConfig.LogFlushIntervalMsProp)).getOrElse(getLong(KafkaConfig.LogFlushSchedulerIntervalMsProp)) + val logRetentionTimeMillis = getLogRetentionTimeMillis val minInSyncReplicas = getInt(KafkaConfig.MinInSyncReplicasProp) - val logPreAllocateEnable: Boolean = getBoolean(KafkaConfig.LogPreAllocateProp) + val logPreAllocateEnable: java.lang.Boolean = getBoolean(KafkaConfig.LogPreAllocateProp) /** ********* Replication configuration ***********/ val controllerSocketTimeoutMs: Int = getInt(KafkaConfig.ControllerSocketTimeoutMsProp) @@ -672,7 +673,7 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka val autoLeaderRebalanceEnable = getBoolean(KafkaConfig.AutoLeaderRebalanceEnableProp) val leaderImbalancePerBrokerPercentage = getInt(KafkaConfig.LeaderImbalancePerBrokerPercentageProp) val leaderImbalanceCheckIntervalSeconds = getLong(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp) - val uncleanLeaderElectionEnable = getBoolean(KafkaConfig.UncleanLeaderElectionEnableProp) + val uncleanLeaderElectionEnable: java.lang.Boolean = getBoolean(KafkaConfig.UncleanLeaderElectionEnableProp) val interBrokerSecurityProtocol = SecurityProtocol.valueOf(getString(KafkaConfig.InterBrokerSecurityProtocolProp)) val interBrokerProtocolVersion = ApiVersion(getString(KafkaConfig.InterBrokerProtocolVersionProp)) @@ -713,7 +714,6 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka val listeners = getListeners val advertisedListeners = getAdvertisedListeners - val logRetentionTimeMillis = getLogRetentionTimeMillis private def getLogRetentionTimeMillis: Long = { val millisInMinute = 60L * 1000L http://git-wip-us.apache.org/repos/asf/kafka/blob/503bd366/core/src/main/scala/kafka/server/KafkaServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 6d65507..a0e3fdf 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -43,6 +43,36 @@ import kafka.metrics.KafkaMetricsGroup import com.yammer.metrics.core.Gauge import kafka.coordinator.{GroupManagerConfig, ConsumerCoordinator} + +object KafkaServer { + // Copy the subset of properties that are relevant to Logs + // I'm listing out individual properties here since the names are slightly different in each Config class... + def copyKafkaConfigToLog(kafkaConfig: KafkaConfig): java.util.Map[String, Object] = { + val logProps = new util.HashMap[String, Object]() + logProps.put(LogConfig.SegmentBytesProp, kafkaConfig.logSegmentBytes) + logProps.put(LogConfig.SegmentMsProp, kafkaConfig.logRollTimeMillis) + logProps.put(LogConfig.SegmentJitterMsProp, kafkaConfig.logRollTimeJitterMillis) + logProps.put(LogConfig.SegmentIndexBytesProp, kafkaConfig.logIndexSizeMaxBytes) + logProps.put(LogConfig.FlushMessagesProp, kafkaConfig.logFlushIntervalMessages) + logProps.put(LogConfig.FlushMsProp, kafkaConfig.logFlushIntervalMs) + logProps.put(LogConfig.RetentionBytesProp, kafkaConfig.logRetentionBytes) + logProps.put(LogConfig.RetentionMsProp, kafkaConfig.logRetentionTimeMillis: java.lang.Long) + logProps.put(LogConfig.MaxMessageBytesProp, kafkaConfig.messageMaxBytes) + logProps.put(LogConfig.IndexIntervalBytesProp, kafkaConfig.logIndexIntervalBytes) + logProps.put(LogConfig.DeleteRetentionMsProp, kafkaConfig.logCleanerDeleteRetentionMs) + logProps.put(LogConfig.FileDeleteDelayMsProp, kafkaConfig.logDeleteDelayMs) + logProps.put(LogConfig.MinCleanableDirtyRatioProp, kafkaConfig.logCleanerMinCleanRatio) + logProps.put(LogConfig.CleanupPolicyProp, kafkaConfig.logCleanupPolicy) + logProps.put(LogConfig.MinInSyncReplicasProp, kafkaConfig.minInSyncReplicas) + logProps.put(LogConfig.CompressionTypeProp, kafkaConfig.compressionType) + logProps.put(LogConfig.UncleanLeaderElectionEnableProp, kafkaConfig.uncleanLeaderElectionEnable) + logProps.put(LogConfig.PreAllocateEnableProp, kafkaConfig.logPreAllocateEnable) + logProps + } +} + + + /** * Represents the lifecycle of a single Kafka broker. Handles all functionality required * to start up and shutdown a single Kafka node. @@ -387,7 +417,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg def boundPort(): Int = socketServer.boundPort() private def createLogManager(zkClient: ZkClient, brokerState: BrokerState): LogManager = { - val defaultProps = copyKafkaConfigToLog(config.originals) + val defaultProps = KafkaServer.copyKafkaConfigToLog(config) val defaultLogConfig = LogConfig(defaultProps) val configs = AdminUtils.fetchAllTopicConfigs(zkClient).mapValues(LogConfig.fromProps(defaultProps, _)) @@ -413,39 +443,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg time = time) } - // Copy the subset of properties that are relevant to Logs - // I'm listing out individual properties here since the names are slightly different in each Config class... - private def copyKafkaConfigToLog(serverProps: java.util.Map[String, Object]): java.util.Map[String, Object] = { - - val logProps = new util.HashMap[String, Object]() - val entryset = serverProps.entrySet.iterator - while (entryset.hasNext) { - val entry = entryset.next - entry.getKey match { - case KafkaConfig.LogSegmentBytesProp => logProps.put(LogConfig.SegmentBytesProp, entry.getValue) - case KafkaConfig.LogRollTimeMillisProp => logProps.put(LogConfig.SegmentMsProp, entry.getValue) - case KafkaConfig.LogRollTimeJitterMillisProp => logProps.put(LogConfig.SegmentJitterMsProp, entry.getValue) - case KafkaConfig.LogIndexSizeMaxBytesProp => logProps.put(LogConfig.SegmentIndexBytesProp, entry.getValue) - case KafkaConfig.LogFlushIntervalMessagesProp => logProps.put(LogConfig.FlushMessagesProp, entry.getValue) - case KafkaConfig.LogFlushIntervalMsProp => logProps.put(LogConfig.FlushMsProp, entry.getValue) - case KafkaConfig.LogRetentionBytesProp => logProps.put(LogConfig.RetentionBytesProp, entry.getValue) - case KafkaConfig.LogRetentionTimeMillisProp => logProps.put(LogConfig.RetentionMsProp, entry.getValue) - case KafkaConfig.MessageMaxBytesProp => logProps.put(LogConfig.MaxMessageBytesProp, entry.getValue) - case KafkaConfig.LogIndexIntervalBytesProp => logProps.put(LogConfig.IndexIntervalBytesProp, entry.getValue) - case KafkaConfig.LogCleanerDeleteRetentionMsProp => logProps.put(LogConfig.DeleteRetentionMsProp, entry.getValue) - case KafkaConfig.LogDeleteDelayMsProp => logProps.put(LogConfig.FileDeleteDelayMsProp, entry.getValue) - case KafkaConfig.LogCleanerMinCleanRatioProp => logProps.put(LogConfig.MinCleanableDirtyRatioProp, entry.getValue) - case KafkaConfig.LogCleanupPolicyProp => logProps.put(LogConfig.CleanupPolicyProp, entry.getValue) - case KafkaConfig.MinInSyncReplicasProp => logProps.put(LogConfig.MinInSyncReplicasProp, entry.getValue) - case KafkaConfig.CompressionTypeProp => logProps.put(LogConfig.CompressionTypeProp, entry.getValue) - case KafkaConfig.UncleanLeaderElectionEnableProp => logProps.put(LogConfig.UncleanLeaderElectionEnableProp, entry.getValue) - case KafkaConfig.LogPreAllocateProp => logProps.put(LogConfig.PreAllocateEnableProp, entry.getValue) - case _ => // we just leave those out - } - } - logProps - } - /** * Generates new brokerId or reads from meta.properties based on following conditions * <ol> http://git-wip-us.apache.org/repos/asf/kafka/blob/503bd366/core/src/test/scala/unit/kafka/log/LogConfigTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala index 348b012..51cd62c 100644 --- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala @@ -19,13 +19,32 @@ package kafka.log import java.util.Properties +import kafka.server.KafkaConfig +import kafka.server.KafkaServer +import kafka.utils.TestUtils import org.apache.kafka.common.config.ConfigException import org.junit.{Assert, Test} +import org.junit.Assert._ import org.scalatest.Assertions._ class LogConfigTest { @Test + def testKafkaConfigToProps() { + val millisInHour = 60L * 60L * 1000L + val kafkaProps = TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "") + kafkaProps.put(KafkaConfig.LogRollTimeHoursProp, "2") + kafkaProps.put(KafkaConfig.LogRollTimeJitterHoursProp, "2") + kafkaProps.put(KafkaConfig.LogRetentionTimeHoursProp, "2") + + val kafkaConfig = KafkaConfig.fromProps(kafkaProps) + val logProps = KafkaServer.copyKafkaConfigToLog(kafkaConfig) + assertEquals(2 * millisInHour, logProps.get(LogConfig.SegmentMsProp)) + assertEquals(2 * millisInHour, logProps.get(LogConfig.SegmentJitterMsProp)) + assertEquals(2 * millisInHour, logProps.get(LogConfig.RetentionMsProp)) + } + + @Test def testFromPropsEmpty() { val p = new Properties() val config = LogConfig(p)