showuon commented on code in PR #16765:
URL: https://github.com/apache/kafka/pull/16765#discussion_r1703885563
##########
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##########
@@ -4188,11 +4188,77 @@ class UnifiedLogTest {
assertEquals(1, log.logSegments.size)
}
+ @Test
+ def testRetentionOnLocalLogDeletionWhenRemoteCopyDisabled(): Unit = {
+ def createRecords = TestUtils.records(List(new
SimpleRecord(mockTime.milliseconds(), "a".getBytes)))
+ val segmentBytes = createRecords.sizeInBytes()
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = segmentBytes,
localRetentionBytes = 1, retentionBytes = segmentBytes * 5,
+ fileDeleteDelayMs = 0, remoteLogStorageEnable = true)
+ val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
+
+ // Given 10 segments of 1 message each
+ for (_ <- 0 until 10) {
+ log.appendAsLeader(createRecords, leaderEpoch = 0)
+ }
+ assertEquals(10, log.logSegments.size)
+
+ log.updateHighWatermark(log.logEndOffset)
+ // simulate calls to upload 2 segments to remote storage
+ log.updateHighestOffsetInRemoteStorage(1)
+
+ log.deleteOldSegments()
+ assertEquals(8, log.logSegments.size())
+ assertEquals(0, log.logStartOffset)
+ assertEquals(2, log.localLogStartOffset())
+
+ // add remoteCopyDisabled = true
+ val copyDisabledLogConfig = LogTestUtils.createLogConfig(segmentBytes =
segmentBytes, localRetentionBytes = 1, retentionBytes = segmentBytes * 5,
+ fileDeleteDelayMs = 0, remoteLogStorageEnable = true, remoteCopyDisabled
= true)
+ log.updateConfig(copyDisabledLogConfig)
+
+ // No local logs will be deleted even though local retention bytes is 1
because there are still logs in remote storage
+ log.deleteOldSegments()
+ assertEquals(8, log.logSegments.size())
+ assertEquals(0, log.logStartOffset)
+ assertEquals(2, log.localLogStartOffset())
+
+ // simulate the remote logs are all deleted due to retention policy
+ log.updateLogStartOffsetFromRemoteTier(2)
+ assertEquals(8, log.logSegments.size())
+ assertEquals(2, log.logStartOffset)
+ assertEquals(2, log.localLogStartOffset())
+
+ // try to delete local logs again, 3 segments will be deleted this time
because log start offset == local log start offset,
+ // which means no remote storage is empty. We'll treat this log as local
logs and use retention.bytes for retention policy.
Review Comment:
@kamalcph , your comment makes me think of 1 situation that we can't handle
the local log deletion well.
Suppose we set `local.retention.ms = 1 min`, `retention.ms = 10 min`.
00:00 -> produced and uploaded segment0 to remote storage, and then set
`remote.log.copy.disable = true`.
00:01 -> we expect to delete local log segment0, but because `log start
offset == local log start offset == 0`, we can't delete local logs, until 10
mins later.
So, if the `retention.bytes = 10TB`, and `local.retention.bytes = 10GB`,
that would be very bad.
I was thinking we can add one more check: `(logStartOffset !=
localLogStartOffset() || localLogStartOffset() <
highestOffsetInRemoteStorage())`, in this way, we can know if there are
"pending local log segments to be deleted". But if, like you said, the another
broker has different segment.baseOffset, say:
- localLogStartOffset = 9,
- highestOffsetInRemoteStorage() = 10
And because the remote copy is disabled, the `highestOffsetInRemoteStorage`
won't be increased anymore, and we can't delete local logs, either.
I can't come out any better solution. I'd like to hear your thoughts.
@kamalcph @satishd
If we can't come out a better solution, I'm proposing we `always use
"local.retention.ms/bytes" when "remote.log.copy.disable = true"`. I think
using `local.retention.ms/bytes` makes sense to me because it is applying to
the local logs. WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]