kamalcph commented on code in PR #18004: URL: https://github.com/apache/kafka/pull/18004#discussion_r1959295984
########## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ########## @@ -553,29 +554,47 @@ class RemoteIndexCacheTest { @Test def testCorrectnessForCacheAndIndexFilesWhenResizeCache(): Unit = { + def getRemoteLogSegMetadataIsKept(metadataToVerify: List[RemoteLogSegmentMetadata]): List[RemoteLogSegmentMetadata] = { + metadataToVerify.filter(s => { cache.internalCache().asMap().keySet().contains(s.remoteLogSegmentId().id())}) + } - def verifyEntryIsEvicted(metadataToVerify: RemoteLogSegmentMetadata, entryToVerify: Entry): Unit = { - // wait until `entryToVerify` is marked for deletion - TestUtils.waitUntilTrue(() => entryToVerify.isMarkedForCleanup, + def verifyEntryIsEvicted(metadataToVerify: List[RemoteLogSegmentMetadata], entriesToVerify: List[Entry], + numOfMarkAsDeleted: Int): (List[RemoteLogSegmentMetadata], List[Entry]) = { + TestUtils.waitUntilTrue(() => entriesToVerify.count(_.isMarkedForCleanup).equals(numOfMarkAsDeleted), "Failed to mark evicted cache entry for cleanup after resizing cache.") - TestUtils.waitUntilTrue(() => entryToVerify.isCleanStarted, + + TestUtils.waitUntilTrue(() => entriesToVerify.count(_.isCleanStarted).equals(numOfMarkAsDeleted), "Failed to cleanup evicted cache entry after resizing cache.") - // verify no index files for `entryToVerify` on remote cache dir - TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, remoteOffsetIndexFileName(metadataToVerify)).isEmpty, - s"Offset index file for evicted entry should not be present on disk at ${cache.cacheDir()}") - TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, remoteTimeIndexFileName(metadataToVerify)).isEmpty, - s"Time index file for evicted entry should not be present on disk at ${cache.cacheDir()}") - TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, remoteTransactionIndexFileName(metadataToVerify)).isEmpty, - s"Txn index file for evicted entry should not be present on disk at ${cache.cacheDir()}") - TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, remoteDeletedSuffixIndexFileName(metadataToVerify)).isEmpty, - s"Index file marked for deletion for evicted entry should not be present on disk at ${cache.cacheDir()}") + + val entriesIsMarkedForCleanup = entriesToVerify.filter(_.isMarkedForCleanup) + val entriesIsCleanStarted = entriesToVerify.filter(_.isCleanStarted) + // clean up entries and clean start entries should be the same + assertTrue(entriesIsMarkedForCleanup.equals(entriesIsCleanStarted)) + + // get the logSegMetadata are evicted + val metedataDeleted = metadataToVerify.filter(s => { !cache.internalCache().asMap().keySet().contains(s.remoteLogSegmentId().id())}) + assertEquals(numOfMarkAsDeleted, metedataDeleted.size) + for (metadata <- metedataDeleted) { + // verify no index files for `entryToVerify` on remote cache dir + TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, remoteOffsetIndexFileName(metadata)).isPresent, + s"Offset index file for evicted entry should not be present on disk at ${cache.cacheDir()}") + TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, remoteTimeIndexFileName(metadata)).isPresent, + s"Time index file for evicted entry should not be present on disk at ${cache.cacheDir()}") + TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, remoteTransactionIndexFileName(metadata)).isPresent, + s"Txn index file for evicted entry should not be present on disk at ${cache.cacheDir()}") + TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, remoteDeletedSuffixIndexFileName(metadata)).isPresent, + s"Index file marked for deletion for evicted entry should not be present on disk at ${cache.cacheDir()}") + } + (metedataDeleted, entriesIsMarkedForCleanup) } - def verifyEntryIsKept(metadataToVerify: RemoteLogSegmentMetadata): Unit = { - assertTrue(getIndexFileFromRemoteCacheDir(cache, remoteOffsetIndexFileName(metadataToVerify)).isPresent) - assertTrue(getIndexFileFromRemoteCacheDir(cache, remoteTimeIndexFileName(metadataToVerify)).isPresent) - assertTrue(getIndexFileFromRemoteCacheDir(cache, remoteTransactionIndexFileName(metadataToVerify)).isPresent) - assertTrue(getIndexFileFromRemoteCacheDir(cache, remoteDeletedSuffixIndexFileName(metadataToVerify)).isEmpty) + def verifyEntryIsKept(metadataToVerify: List[RemoteLogSegmentMetadata]): Unit = { + for (metadata <- metadataToVerify) { + assertTrue(getIndexFileFromRemoteCacheDir(cache, remoteOffsetIndexFileName(metadata)).isPresent) + assertTrue(getIndexFileFromRemoteCacheDir(cache, remoteTimeIndexFileName(metadata)).isPresent) + assertTrue(getIndexFileFromRemoteCacheDir(cache, remoteTransactionIndexFileName(metadata)).isPresent) + assertTrue(!getIndexFileFromRemoteCacheDir(cache, remoteDeletedSuffixIndexFileName(metadata)).isPresent) Review Comment: same as above. 1. Filename mismatch and 2. invert the condition, use `isEmpty` for readability. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org