junrao commented on code in PR #18321:
URL: https://github.com/apache/kafka/pull/18321#discussion_r1907576962
##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -509,8 +509,8 @@ class UnifiedLog(@volatile var logStartOffset: Long,
}
private def initializeLeaderEpochCache(): Unit = lock synchronized {
- leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(
- dir, topicPartition, logDirFailureChannel, logIdent, leaderEpochCache,
scheduler)
+ leaderEpochCache = UnifiedLog.createLeaderEpochCache(
Review Comment:
This is an existing issue. `initializeLeaderEpochCache` is better named as
`reinitializeLeaderEpochCache`.
##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -264,68 +265,6 @@ class ReplicaManagerTest {
when(cache.getAliveBrokerNodes(any[ListenerName])).thenReturn(aliveBrokers)
}
- @Test
- def testMaybeAddLogDirFetchersWithoutEpochCache(): Unit = {
Review Comment:
This test is still useful. We can just leave
`partition.futureLog.get.leaderEpochCache` unchanged. It will be initialized as
empty and the fetcher thread will default to initializing the fetch offset with
HWM. We probably want to adjust the test name accordingly.
##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -1768,7 +1759,9 @@ class UnifiedLog(@volatile var logStartOffset: Long,
lock synchronized {
localLog.checkIfMemoryMappedBufferClosed()
producerExpireCheck.cancel(true)
- leaderEpochCache.foreach(_.clear())
+ // `renameDir` with `shouldReinitialize=false` sets this to `null` and
it's usually (but not always) called before this method
+ if (leaderEpochCache != null)
+ leaderEpochCache.clear()
Review Comment:
Yes, I agree that we can just call `clear` on `leaderEpochCache` instead of
setting it to null in `renameDir`.
##########
core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala:
##########
@@ -134,6 +135,131 @@ class LogCleanerParameterizedIntegrationTest extends
AbstractLogCleanerIntegrati
assertEquals(toMap(messages), toMap(read), "Contents of the map shouldn't
change")
}
+ @ParameterizedTest
+ @ArgumentsSource(classOf[LogCleanerParameterizedIntegrationTest.ExcludeZstd])
+ def testCleanerWithMessageFormatV0V1V2(compressionType: CompressionType):
Unit = {
+ val compression = Compression.of(compressionType).build()
+ val largeMessageKey = 20
+ val (largeMessageValue, largeMessageSet) =
createLargeSingleMessageSet(largeMessageKey, RecordBatch.MAGIC_VALUE_V0,
compression)
+ val maxMessageSize = compression match {
+ case Compression.NONE => largeMessageSet.sizeInBytes
+ case _ =>
+ // the broker assigns absolute offsets for message format 0 which
potentially causes the compressed size to
+ // increase because the broker offsets are larger than the ones
assigned by the client
+ // adding `6` to the message set size is good enough for this test: it
covers the increased message size while
+ // still being less than the overhead introduced by the conversion
from message format version 0 to 1
+ largeMessageSet.sizeInBytes + 6
Review Comment:
Ah. This is needed for appending the initial message to the log, not during
cleaning.
##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -1680,9 +1671,9 @@ public FetchDataInfo read(RemoteStorageFetchInfo
remoteStorageFetchInfo) throws
OptionalInt epoch = OptionalInt.empty();
if (logOptional.isPresent()) {
- Option<LeaderEpochFileCache> leaderEpochCache =
logOptional.get().leaderEpochCache();
- if (leaderEpochCache != null && leaderEpochCache.isDefined()) {
Review Comment:
Yes, we can remove this check.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]