Repository: kafka
Updated Branches:
  refs/heads/trunk 786867c2e -> 503bd3664


KAFKA-2436; log.retention.hours should be honored by LogManager

Author: Dong Lin <lindon...@gmail.com>

Reviewers: Joel Koshy, Gwen Shapira

Closes #142 from lindong28/KAFKA-2436


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/503bd366
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/503bd366
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/503bd366

Branch: refs/heads/trunk
Commit: 503bd36647695e8cc91893ffb80346dd03eb0bc5
Parents: 786867c
Author: Dong Lin <lindon...@gmail.com>
Authored: Tue Aug 18 13:03:11 2015 -0700
Committer: Gwen Shapira <csh...@gmail.com>
Committed: Tue Aug 18 13:03:11 2015 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/LogConfig.scala   |  5 +-
 .../main/scala/kafka/server/KafkaConfig.scala   |  8 +--
 .../main/scala/kafka/server/KafkaServer.scala   | 65 ++++++++++----------
 .../scala/unit/kafka/log/LogConfigTest.scala    | 19 ++++++
 4 files changed, 58 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/503bd366/core/src/main/scala/kafka/log/LogConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala 
b/core/src/main/scala/kafka/log/LogConfig.scala
index c969d16..7fc7e33 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -47,7 +47,10 @@ object Defaults {
 }
 
 case class LogConfig(props: java.util.Map[_, _]) extends 
AbstractConfig(LogConfig.configDef, props, false) {
-
+  /**
+   * Important note: Any configuration parameter that is passed along from 
KafkaConfig to LogConfig
+   * should also go in copyKafkaConfigToLog.
+   */
   val segmentSize = getInt(LogConfig.SegmentBytesProp)
   val segmentMs = getLong(LogConfig.SegmentMsProp)
   val segmentJitterMs = getLong(LogConfig.SegmentJitterMsProp)

http://git-wip-us.apache.org/repos/asf/kafka/blob/503bd366/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 394f21b..c39402c 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -335,7 +335,7 @@ object KafkaConfig {
   val LogRetentionTimeHoursDoc = "The number of hours to keep a log file 
before deleting it (in hours), tertiary to " + LogRetentionTimeMillisProp + " 
property"
 
   val LogRetentionBytesDoc = "The maximum size of the log before deleting it"
-  val LogCleanupIntervalMsDoc = "The frequency in minutes that the log cleaner 
checks whether any log is eligible for deletion"
+  val LogCleanupIntervalMsDoc = "The frequency in milliseconds that the log 
cleaner checks whether any log is eligible for deletion"
   val LogCleanupPolicyDoc = "The default cleanup policy for segments beyond 
the retention window, must be either \"delete\" or \"compact\""
   val LogCleanerThreadsDoc = "The number of background threads to use for log 
cleaning"
   val LogCleanerIoMaxBytesPerSecondDoc = "The log cleaner will be throttled so 
that the sum of its read and write i/o will be less than this value on average"
@@ -652,8 +652,9 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends 
AbstractConfig(Kafka
   val logRollTimeMillis: java.lang.Long = 
Option(getLong(KafkaConfig.LogRollTimeMillisProp)).getOrElse(60 * 60 * 1000L * 
getInt(KafkaConfig.LogRollTimeHoursProp))
   val logRollTimeJitterMillis: java.lang.Long = 
Option(getLong(KafkaConfig.LogRollTimeJitterMillisProp)).getOrElse(60 * 60 * 
1000L * getInt(KafkaConfig.LogRollTimeJitterHoursProp))
   val logFlushIntervalMs: java.lang.Long = 
Option(getLong(KafkaConfig.LogFlushIntervalMsProp)).getOrElse(getLong(KafkaConfig.LogFlushSchedulerIntervalMsProp))
+  val logRetentionTimeMillis = getLogRetentionTimeMillis
   val minInSyncReplicas = getInt(KafkaConfig.MinInSyncReplicasProp)
-  val logPreAllocateEnable: Boolean = 
getBoolean(KafkaConfig.LogPreAllocateProp)
+  val logPreAllocateEnable: java.lang.Boolean = 
getBoolean(KafkaConfig.LogPreAllocateProp)
 
   /** ********* Replication configuration ***********/
   val controllerSocketTimeoutMs: Int = 
getInt(KafkaConfig.ControllerSocketTimeoutMsProp)
@@ -672,7 +673,7 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends 
AbstractConfig(Kafka
   val autoLeaderRebalanceEnable = 
getBoolean(KafkaConfig.AutoLeaderRebalanceEnableProp)
   val leaderImbalancePerBrokerPercentage = 
getInt(KafkaConfig.LeaderImbalancePerBrokerPercentageProp)
   val leaderImbalanceCheckIntervalSeconds = 
getLong(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp)
-  val uncleanLeaderElectionEnable = 
getBoolean(KafkaConfig.UncleanLeaderElectionEnableProp)
+  val uncleanLeaderElectionEnable: java.lang.Boolean = 
getBoolean(KafkaConfig.UncleanLeaderElectionEnableProp)
   val interBrokerSecurityProtocol = 
SecurityProtocol.valueOf(getString(KafkaConfig.InterBrokerSecurityProtocolProp))
   val interBrokerProtocolVersion = 
ApiVersion(getString(KafkaConfig.InterBrokerProtocolVersionProp))
 
@@ -713,7 +714,6 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends 
AbstractConfig(Kafka
 
   val listeners = getListeners
   val advertisedListeners = getAdvertisedListeners
-  val logRetentionTimeMillis = getLogRetentionTimeMillis
 
   private def getLogRetentionTimeMillis: Long = {
     val millisInMinute = 60L * 1000L

http://git-wip-us.apache.org/repos/asf/kafka/blob/503bd366/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 6d65507..a0e3fdf 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -43,6 +43,36 @@ import kafka.metrics.KafkaMetricsGroup
 import com.yammer.metrics.core.Gauge
 import kafka.coordinator.{GroupManagerConfig, ConsumerCoordinator}
 
+
+object KafkaServer {
+  // Copy the subset of properties that are relevant to Logs
+  // I'm listing out individual properties here since the names are slightly 
different in each Config class...
+  def copyKafkaConfigToLog(kafkaConfig: KafkaConfig): java.util.Map[String, 
Object] = {
+    val logProps = new util.HashMap[String, Object]()
+    logProps.put(LogConfig.SegmentBytesProp, kafkaConfig.logSegmentBytes)
+    logProps.put(LogConfig.SegmentMsProp, kafkaConfig.logRollTimeMillis)
+    logProps.put(LogConfig.SegmentJitterMsProp, 
kafkaConfig.logRollTimeJitterMillis)
+    logProps.put(LogConfig.SegmentIndexBytesProp, 
kafkaConfig.logIndexSizeMaxBytes)
+    logProps.put(LogConfig.FlushMessagesProp, 
kafkaConfig.logFlushIntervalMessages)
+    logProps.put(LogConfig.FlushMsProp, kafkaConfig.logFlushIntervalMs)
+    logProps.put(LogConfig.RetentionBytesProp, kafkaConfig.logRetentionBytes)
+    logProps.put(LogConfig.RetentionMsProp, 
kafkaConfig.logRetentionTimeMillis: java.lang.Long)
+    logProps.put(LogConfig.MaxMessageBytesProp, kafkaConfig.messageMaxBytes)
+    logProps.put(LogConfig.IndexIntervalBytesProp, 
kafkaConfig.logIndexIntervalBytes)
+    logProps.put(LogConfig.DeleteRetentionMsProp, 
kafkaConfig.logCleanerDeleteRetentionMs)
+    logProps.put(LogConfig.FileDeleteDelayMsProp, kafkaConfig.logDeleteDelayMs)
+    logProps.put(LogConfig.MinCleanableDirtyRatioProp, 
kafkaConfig.logCleanerMinCleanRatio)
+    logProps.put(LogConfig.CleanupPolicyProp, kafkaConfig.logCleanupPolicy)
+    logProps.put(LogConfig.MinInSyncReplicasProp, 
kafkaConfig.minInSyncReplicas)
+    logProps.put(LogConfig.CompressionTypeProp, kafkaConfig.compressionType)
+    logProps.put(LogConfig.UncleanLeaderElectionEnableProp, 
kafkaConfig.uncleanLeaderElectionEnable)
+    logProps.put(LogConfig.PreAllocateEnableProp, 
kafkaConfig.logPreAllocateEnable)
+    logProps
+  }
+}
+
+
+
 /**
  * Represents the lifecycle of a single Kafka broker. Handles all 
functionality required
  * to start up and shutdown a single Kafka node.
@@ -387,7 +417,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
SystemTime) extends Logg
   def boundPort(): Int = socketServer.boundPort()
 
   private def createLogManager(zkClient: ZkClient, brokerState: BrokerState): 
LogManager = {
-    val defaultProps = copyKafkaConfigToLog(config.originals)
+    val defaultProps = KafkaServer.copyKafkaConfigToLog(config)
     val defaultLogConfig = LogConfig(defaultProps)
 
     val configs = 
AdminUtils.fetchAllTopicConfigs(zkClient).mapValues(LogConfig.fromProps(defaultProps,
 _))
@@ -413,39 +443,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
SystemTime) extends Logg
                    time = time)
   }
 
-  // Copy the subset of properties that are relevant to Logs
-  // I'm listing out individual properties here since the names are slightly 
different in each Config class...
-  private def copyKafkaConfigToLog(serverProps: java.util.Map[String, 
Object]): java.util.Map[String, Object] = {
-
-    val logProps = new util.HashMap[String, Object]()
-    val entryset = serverProps.entrySet.iterator
-    while (entryset.hasNext) {
-      val entry = entryset.next
-      entry.getKey match {
-        case KafkaConfig.LogSegmentBytesProp => 
logProps.put(LogConfig.SegmentBytesProp, entry.getValue)
-        case KafkaConfig.LogRollTimeMillisProp => 
logProps.put(LogConfig.SegmentMsProp, entry.getValue)
-        case KafkaConfig.LogRollTimeJitterMillisProp => 
logProps.put(LogConfig.SegmentJitterMsProp, entry.getValue)
-        case KafkaConfig.LogIndexSizeMaxBytesProp => 
logProps.put(LogConfig.SegmentIndexBytesProp, entry.getValue)
-        case KafkaConfig.LogFlushIntervalMessagesProp => 
logProps.put(LogConfig.FlushMessagesProp, entry.getValue)
-        case KafkaConfig.LogFlushIntervalMsProp => 
logProps.put(LogConfig.FlushMsProp, entry.getValue)
-        case KafkaConfig.LogRetentionBytesProp => 
logProps.put(LogConfig.RetentionBytesProp, entry.getValue)
-        case KafkaConfig.LogRetentionTimeMillisProp => 
logProps.put(LogConfig.RetentionMsProp, entry.getValue)
-        case KafkaConfig.MessageMaxBytesProp => 
logProps.put(LogConfig.MaxMessageBytesProp, entry.getValue)
-        case KafkaConfig.LogIndexIntervalBytesProp => 
logProps.put(LogConfig.IndexIntervalBytesProp, entry.getValue)
-        case KafkaConfig.LogCleanerDeleteRetentionMsProp => 
logProps.put(LogConfig.DeleteRetentionMsProp, entry.getValue)
-        case KafkaConfig.LogDeleteDelayMsProp => 
logProps.put(LogConfig.FileDeleteDelayMsProp, entry.getValue)
-        case KafkaConfig.LogCleanerMinCleanRatioProp => 
logProps.put(LogConfig.MinCleanableDirtyRatioProp, entry.getValue)
-        case KafkaConfig.LogCleanupPolicyProp => 
logProps.put(LogConfig.CleanupPolicyProp, entry.getValue)
-        case KafkaConfig.MinInSyncReplicasProp => 
logProps.put(LogConfig.MinInSyncReplicasProp, entry.getValue)
-        case KafkaConfig.CompressionTypeProp => 
logProps.put(LogConfig.CompressionTypeProp, entry.getValue)
-        case KafkaConfig.UncleanLeaderElectionEnableProp => 
logProps.put(LogConfig.UncleanLeaderElectionEnableProp, entry.getValue)
-        case KafkaConfig.LogPreAllocateProp => 
logProps.put(LogConfig.PreAllocateEnableProp, entry.getValue)
-        case _ => // we just leave those out
-      }
-    }
-    logProps
-  }
-
   /**
     * Generates new brokerId or reads from meta.properties based on following 
conditions
     * <ol>

http://git-wip-us.apache.org/repos/asf/kafka/blob/503bd366/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala 
b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
index 348b012..51cd62c 100644
--- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
@@ -19,13 +19,32 @@ package kafka.log
 
 import java.util.Properties
 
+import kafka.server.KafkaConfig
+import kafka.server.KafkaServer
+import kafka.utils.TestUtils
 import org.apache.kafka.common.config.ConfigException
 import org.junit.{Assert, Test}
+import org.junit.Assert._
 import org.scalatest.Assertions._
 
 class LogConfigTest {
 
   @Test
+  def testKafkaConfigToProps() {
+    val millisInHour = 60L * 60L * 1000L
+    val kafkaProps = TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "")
+    kafkaProps.put(KafkaConfig.LogRollTimeHoursProp, "2")
+    kafkaProps.put(KafkaConfig.LogRollTimeJitterHoursProp, "2")
+    kafkaProps.put(KafkaConfig.LogRetentionTimeHoursProp, "2")
+
+    val kafkaConfig = KafkaConfig.fromProps(kafkaProps)
+    val logProps = KafkaServer.copyKafkaConfigToLog(kafkaConfig)
+    assertEquals(2 * millisInHour, logProps.get(LogConfig.SegmentMsProp))
+    assertEquals(2 * millisInHour, logProps.get(LogConfig.SegmentJitterMsProp))
+    assertEquals(2 * millisInHour, logProps.get(LogConfig.RetentionMsProp))
+  }
+
+  @Test
   def testFromPropsEmpty() {
     val p = new Properties()
     val config = LogConfig(p)

Reply via email to