junrao commented on code in PR #18321:
URL: https://github.com/apache/kafka/pull/18321#discussion_r1907576962


##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -509,8 +509,8 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   }
 
   private def initializeLeaderEpochCache(): Unit = lock synchronized {
-    leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(
-      dir, topicPartition, logDirFailureChannel, logIdent, leaderEpochCache, 
scheduler)
+    leaderEpochCache = UnifiedLog.createLeaderEpochCache(

Review Comment:
   This is an existing issue. `initializeLeaderEpochCache` is better named as 
`reinitializeLeaderEpochCache`.



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -264,68 +265,6 @@ class ReplicaManagerTest {
     when(cache.getAliveBrokerNodes(any[ListenerName])).thenReturn(aliveBrokers)
   }
 
-  @Test
-  def testMaybeAddLogDirFetchersWithoutEpochCache(): Unit = {

Review Comment:
   This test is still useful. We can just leave 
`partition.futureLog.get.leaderEpochCache` unchanged. It will be initialized as 
empty and the fetcher thread will default to initializing the fetch offset with 
HWM. We probably want to adjust the test name accordingly.



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -1768,7 +1759,9 @@ class UnifiedLog(@volatile var logStartOffset: Long,
       lock synchronized {
         localLog.checkIfMemoryMappedBufferClosed()
         producerExpireCheck.cancel(true)
-        leaderEpochCache.foreach(_.clear())
+        // `renameDir` with `shouldReinitialize=false` sets this to `null` and 
it's usually (but not always) called before this method
+        if (leaderEpochCache != null)
+          leaderEpochCache.clear()

Review Comment:
   Yes, I agree that we can just call `clear` on `leaderEpochCache` instead of 
setting it to null in `renameDir`.



##########
core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala:
##########
@@ -134,6 +135,131 @@ class LogCleanerParameterizedIntegrationTest extends 
AbstractLogCleanerIntegrati
     assertEquals(toMap(messages), toMap(read), "Contents of the map shouldn't 
change")
   }
 
+  @ParameterizedTest
+  @ArgumentsSource(classOf[LogCleanerParameterizedIntegrationTest.ExcludeZstd])
+  def testCleanerWithMessageFormatV0V1V2(compressionType: CompressionType): 
Unit = {
+    val compression = Compression.of(compressionType).build()
+    val largeMessageKey = 20
+    val (largeMessageValue, largeMessageSet) = 
createLargeSingleMessageSet(largeMessageKey, RecordBatch.MAGIC_VALUE_V0, 
compression)
+    val maxMessageSize = compression match {
+      case Compression.NONE => largeMessageSet.sizeInBytes
+      case _ =>
+        // the broker assigns absolute offsets for message format 0 which 
potentially causes the compressed size to
+        // increase because the broker offsets are larger than the ones 
assigned by the client
+        // adding `6` to the message set size is good enough for this test: it 
covers the increased message size while
+        // still being less than the overhead introduced by the conversion 
from message format version 0 to 1
+        largeMessageSet.sizeInBytes + 6

Review Comment:
   Ah. This is needed for appending the initial message to the log, not during 
cleaning.



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -1680,9 +1671,9 @@ public FetchDataInfo read(RemoteStorageFetchInfo 
remoteStorageFetchInfo) throws
         OptionalInt epoch = OptionalInt.empty();
 
         if (logOptional.isPresent()) {
-            Option<LeaderEpochFileCache> leaderEpochCache = 
logOptional.get().leaderEpochCache();
-            if (leaderEpochCache != null && leaderEpochCache.isDefined()) {

Review Comment:
   Yes, we can remove this check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to