mumrah commented on a change in pull request #10256:
URL: https://github.com/apache/kafka/pull/10256#discussion_r587767322



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -297,10 +288,43 @@ final class KafkaMetadataLog private (
 }
 
 object KafkaMetadataLog {
+
   def apply(
+    topicPartition: TopicPartition,
+    dataDir: File,
+    time: Time,
+    scheduler: Scheduler,
+    maxBatchSizeInBytes: Int,
+    maxFetchSizeInBytes: Int
+  ): KafkaMetadataLog = {
+    val props = new Properties()
+    props.put(LogConfig.MaxMessageBytesProp, maxBatchSizeInBytes.toString)
+    props.put(LogConfig.MessageFormatVersionProp, 
ApiVersion.latestVersion.toString)
+
+    LogConfig.validateValues(props)
+    val defaultLogConfig = LogConfig(props)
+
+    val log = Log(

Review comment:
       👍  Makes sense to move this here since KafkaMetadataLog fully owns the 
Log's lifecycle

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -297,10 +288,43 @@ final class KafkaMetadataLog private (
 }
 
 object KafkaMetadataLog {
+
   def apply(
+    topicPartition: TopicPartition,
+    dataDir: File,
+    time: Time,
+    scheduler: Scheduler,
+    maxBatchSizeInBytes: Int,
+    maxFetchSizeInBytes: Int
+  ): KafkaMetadataLog = {
+    val props = new Properties()
+    props.put(LogConfig.MaxMessageBytesProp, maxBatchSizeInBytes.toString)
+    props.put(LogConfig.MessageFormatVersionProp, 
ApiVersion.latestVersion.toString)
+
+    LogConfig.validateValues(props)
+    val defaultLogConfig = LogConfig(props)
+
+    val log = Log(
+      dir = dataDir,
+      config = defaultLogConfig,
+      logStartOffset = 0L,
+      recoveryPoint = 0L,
+      scheduler = scheduler,
+      brokerTopicStats = new BrokerTopicStats,
+      time = time,
+      maxProducerIdExpirationMs = Int.MaxValue,
+      producerIdExpirationCheckIntervalMs = Int.MaxValue,
+      logDirFailureChannel = new LogDirFailureChannel(5),
+      keepPartitionMetadataFile = false
+    )
+
+    KafkaMetadataLog(log, topicPartition, maxFetchSizeInBytes)
+  }
+
+  private def apply(

Review comment:
       Do we still need this additional private factory? Is it used by tests or 
something? Looks like it's just called from a few lines above, maybe we can 
just combine the two?

##########
File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
##########
@@ -310,44 +288,109 @@ final class KafkaMetadataLogTest {
 
     log.close()
 
-    val secondLog = buildMetadataLog(tempDir, mockTime, topicPartition)
+    val secondLog = buildMetadataLog(tempDir, mockTime)
 
     assertEquals(snapshotId, secondLog.latestSnapshotId.get)
     assertEquals(snapshotId.offset, secondLog.startOffset)
     assertEquals(snapshotId.epoch, secondLog.lastFetchedEpoch)
     assertEquals(snapshotId.offset, secondLog.endOffset().offset)
     assertEquals(snapshotId.offset, secondLog.highWatermark.offset)
   }
+
+  @Test
+  def testMaxBatchSize(): Unit = {
+    val leaderEpoch = 5
+    val maxBatchSizeInBytes = 16384
+    val recordSize = 64

Review comment:
       Does it matter that maxBatchSizeInBytes is a multiple of the record 
size? Could you have done 
   
   > buildFullBatch(leaderEpoch, recordSize, maxBatchSizeInBytes + 1) 
   
   to get the same RecordTooLargeException?

##########
File path: core/src/main/scala/kafka/raft/RaftManager.scala
##########
@@ -220,25 +220,14 @@ class KafkaRaftManager[T](
   }
 
   private def buildMetadataLog(): KafkaMetadataLog = {
-    val defaultProps = LogConfig.extractLogConfigMap(config)

Review comment:
       I see we are no longer calling this, but rather explicitly populating 
the log properties. Are there other log configs we might want to expose? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to