TaiJuWu commented on code in PR #18004:
URL: https://github.com/apache/kafka/pull/18004#discussion_r1959677832


##########
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##########
@@ -553,29 +554,47 @@ class RemoteIndexCacheTest {
 
   @Test
   def testCorrectnessForCacheAndIndexFilesWhenResizeCache(): Unit = {
+    def getRemoteLogSegMetadataIsKept(metadataToVerify: 
List[RemoteLogSegmentMetadata]): List[RemoteLogSegmentMetadata] = {
+      metadataToVerify.filter(s => { 
cache.internalCache().asMap().keySet().contains(s.remoteLogSegmentId().id())})
+    }
 
-    def verifyEntryIsEvicted(metadataToVerify: RemoteLogSegmentMetadata, 
entryToVerify: Entry): Unit = {
-      // wait until `entryToVerify` is marked for deletion
-      TestUtils.waitUntilTrue(() => entryToVerify.isMarkedForCleanup,
+    def verifyEntryIsEvicted(metadataToVerify: List[RemoteLogSegmentMetadata], 
entriesToVerify: List[Entry],
+                             numOfMarkAsDeleted: Int): 
(List[RemoteLogSegmentMetadata], List[Entry]) = {
+      TestUtils.waitUntilTrue(() => 
entriesToVerify.count(_.isMarkedForCleanup).equals(numOfMarkAsDeleted),
         "Failed to mark evicted cache entry for cleanup after resizing cache.")
-      TestUtils.waitUntilTrue(() => entryToVerify.isCleanStarted,
+
+      TestUtils.waitUntilTrue(() => 
entriesToVerify.count(_.isCleanStarted).equals(numOfMarkAsDeleted),
         "Failed to cleanup evicted cache entry after resizing cache.")
-      // verify no index files for `entryToVerify` on remote cache dir
-      TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, 
remoteOffsetIndexFileName(metadataToVerify)).isEmpty,
-        s"Offset index file for evicted entry should not be present on disk at 
${cache.cacheDir()}")
-      TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, 
remoteTimeIndexFileName(metadataToVerify)).isEmpty,
-        s"Time index file for evicted entry should not be present on disk at 
${cache.cacheDir()}")
-      TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, 
remoteTransactionIndexFileName(metadataToVerify)).isEmpty,
-        s"Txn index file for evicted entry should not be present on disk at 
${cache.cacheDir()}")
-      TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, 
remoteDeletedSuffixIndexFileName(metadataToVerify)).isEmpty,
-        s"Index file marked for deletion for evicted entry should not be 
present on disk at ${cache.cacheDir()}")
+
+      val entriesIsMarkedForCleanup = 
entriesToVerify.filter(_.isMarkedForCleanup)
+      val entriesIsCleanStarted = entriesToVerify.filter(_.isCleanStarted)
+      // clean up entries and clean start entries should be the same
+      assertTrue(entriesIsMarkedForCleanup.equals(entriesIsCleanStarted))
+
+      // get the logSegMetadata are evicted
+      val metedataDeleted = metadataToVerify.filter(s => { 
!cache.internalCache().asMap().keySet().contains(s.remoteLogSegmentId().id())})
+      assertEquals(numOfMarkAsDeleted, metedataDeleted.size)
+      for (metadata <- metedataDeleted) {
+        // verify no index files for `entryToVerify` on remote cache dir
+        TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, 
remoteOffsetIndexFileName(metadata)).isPresent,
+          s"Offset index file for evicted entry should not be present on disk 
at ${cache.cacheDir()}")
+        TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, 
remoteTimeIndexFileName(metadata)).isPresent,
+          s"Time index file for evicted entry should not be present on disk at 
${cache.cacheDir()}")
+        TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, 
remoteTransactionIndexFileName(metadata)).isPresent,
+          s"Txn index file for evicted entry should not be present on disk at 
${cache.cacheDir()}")
+        TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, 
remoteDeletedSuffixIndexFileName(metadata)).isPresent,

Review Comment:
   You are correct, I remove 
[remoteDeletedSuffixIndexFileName](https://github.com/apache/kafka/pull/18004/commits/401d2f75c46335e808fc8386b7e2f5e0222d165f)
 from production code and create a specific comments for this in 
https://github.com/apache/kafka/pull/18004/commits/401d2f75c46335e808fc8386b7e2f5e0222d165f.
   
   Other comments are address in 
https://github.com/apache/kafka/pull/18004/commits/877e08a999e455778a7efe2b94c199d9eaf23859



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to