kamalcph commented on code in PR #18004:
URL: https://github.com/apache/kafka/pull/18004#discussion_r1959288862


##########
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##########
@@ -553,29 +554,47 @@ class RemoteIndexCacheTest {
 
   @Test
   def testCorrectnessForCacheAndIndexFilesWhenResizeCache(): Unit = {
+    def getRemoteLogSegMetadataIsKept(metadataToVerify: 
List[RemoteLogSegmentMetadata]): List[RemoteLogSegmentMetadata] = {
+      metadataToVerify.filter(s => { 
cache.internalCache().asMap().keySet().contains(s.remoteLogSegmentId().id())})
+    }
 
-    def verifyEntryIsEvicted(metadataToVerify: RemoteLogSegmentMetadata, 
entryToVerify: Entry): Unit = {
-      // wait until `entryToVerify` is marked for deletion
-      TestUtils.waitUntilTrue(() => entryToVerify.isMarkedForCleanup,
+    def verifyEntryIsEvicted(metadataToVerify: List[RemoteLogSegmentMetadata], 
entriesToVerify: List[Entry],
+                             numOfMarkAsDeleted: Int): 
(List[RemoteLogSegmentMetadata], List[Entry]) = {
+      TestUtils.waitUntilTrue(() => 
entriesToVerify.count(_.isMarkedForCleanup).equals(numOfMarkAsDeleted),
         "Failed to mark evicted cache entry for cleanup after resizing cache.")
-      TestUtils.waitUntilTrue(() => entryToVerify.isCleanStarted,
+
+      TestUtils.waitUntilTrue(() => 
entriesToVerify.count(_.isCleanStarted).equals(numOfMarkAsDeleted),
         "Failed to cleanup evicted cache entry after resizing cache.")
-      // verify no index files for `entryToVerify` on remote cache dir
-      TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, 
remoteOffsetIndexFileName(metadataToVerify)).isEmpty,
-        s"Offset index file for evicted entry should not be present on disk at 
${cache.cacheDir()}")
-      TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, 
remoteTimeIndexFileName(metadataToVerify)).isEmpty,
-        s"Time index file for evicted entry should not be present on disk at 
${cache.cacheDir()}")
-      TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, 
remoteTransactionIndexFileName(metadataToVerify)).isEmpty,
-        s"Txn index file for evicted entry should not be present on disk at 
${cache.cacheDir()}")
-      TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, 
remoteDeletedSuffixIndexFileName(metadataToVerify)).isEmpty,
-        s"Index file marked for deletion for evicted entry should not be 
present on disk at ${cache.cacheDir()}")
+
+      val entriesIsMarkedForCleanup = 
entriesToVerify.filter(_.isMarkedForCleanup)
+      val entriesIsCleanStarted = entriesToVerify.filter(_.isCleanStarted)
+      // clean up entries and clean start entries should be the same
+      assertTrue(entriesIsMarkedForCleanup.equals(entriesIsCleanStarted))
+
+      // get the logSegMetadata are evicted
+      val metedataDeleted = metadataToVerify.filter(s => { 
!cache.internalCache().asMap().keySet().contains(s.remoteLogSegmentId().id())})

Review Comment:
   nit: `metedataDeleted` -> `metadataDeleted`
   
   same as above, please use `containsKey()` 



##########
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##########
@@ -553,29 +554,47 @@ class RemoteIndexCacheTest {
 
   @Test
   def testCorrectnessForCacheAndIndexFilesWhenResizeCache(): Unit = {
+    def getRemoteLogSegMetadataIsKept(metadataToVerify: 
List[RemoteLogSegmentMetadata]): List[RemoteLogSegmentMetadata] = {
+      metadataToVerify.filter(s => { 
cache.internalCache().asMap().keySet().contains(s.remoteLogSegmentId().id())})
+    }
 
-    def verifyEntryIsEvicted(metadataToVerify: RemoteLogSegmentMetadata, 
entryToVerify: Entry): Unit = {
-      // wait until `entryToVerify` is marked for deletion
-      TestUtils.waitUntilTrue(() => entryToVerify.isMarkedForCleanup,
+    def verifyEntryIsEvicted(metadataToVerify: List[RemoteLogSegmentMetadata], 
entriesToVerify: List[Entry],
+                             numOfMarkAsDeleted: Int): 
(List[RemoteLogSegmentMetadata], List[Entry]) = {
+      TestUtils.waitUntilTrue(() => 
entriesToVerify.count(_.isMarkedForCleanup).equals(numOfMarkAsDeleted),
         "Failed to mark evicted cache entry for cleanup after resizing cache.")
-      TestUtils.waitUntilTrue(() => entryToVerify.isCleanStarted,
+
+      TestUtils.waitUntilTrue(() => 
entriesToVerify.count(_.isCleanStarted).equals(numOfMarkAsDeleted),
         "Failed to cleanup evicted cache entry after resizing cache.")
-      // verify no index files for `entryToVerify` on remote cache dir
-      TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, 
remoteOffsetIndexFileName(metadataToVerify)).isEmpty,
-        s"Offset index file for evicted entry should not be present on disk at 
${cache.cacheDir()}")
-      TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, 
remoteTimeIndexFileName(metadataToVerify)).isEmpty,
-        s"Time index file for evicted entry should not be present on disk at 
${cache.cacheDir()}")
-      TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, 
remoteTransactionIndexFileName(metadataToVerify)).isEmpty,
-        s"Txn index file for evicted entry should not be present on disk at 
${cache.cacheDir()}")
-      TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, 
remoteDeletedSuffixIndexFileName(metadataToVerify)).isEmpty,
-        s"Index file marked for deletion for evicted entry should not be 
present on disk at ${cache.cacheDir()}")
+
+      val entriesIsMarkedForCleanup = 
entriesToVerify.filter(_.isMarkedForCleanup)
+      val entriesIsCleanStarted = entriesToVerify.filter(_.isCleanStarted)
+      // clean up entries and clean start entries should be the same
+      assertTrue(entriesIsMarkedForCleanup.equals(entriesIsCleanStarted))
+
+      // get the logSegMetadata are evicted
+      val metedataDeleted = metadataToVerify.filter(s => { 
!cache.internalCache().asMap().keySet().contains(s.remoteLogSegmentId().id())})
+      assertEquals(numOfMarkAsDeleted, metedataDeleted.size)
+      for (metadata <- metedataDeleted) {
+        // verify no index files for `entryToVerify` on remote cache dir
+        TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, 
remoteOffsetIndexFileName(metadata)).isPresent,
+          s"Offset index file for evicted entry should not be present on disk 
at ${cache.cacheDir()}")
+        TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, 
remoteTimeIndexFileName(metadata)).isPresent,
+          s"Time index file for evicted entry should not be present on disk at 
${cache.cacheDir()}")
+        TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, 
remoteTransactionIndexFileName(metadata)).isPresent,
+          s"Txn index file for evicted entry should not be present on disk at 
${cache.cacheDir()}")
+        TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, 
remoteDeletedSuffixIndexFileName(metadata)).isPresent,

Review Comment:
   Is this correct? The `remoteDeletedSuffixIndexFileName(metadata)` returns 
the filename as `offset_segment-uuid.deleted`
   
   But the actual filename is `offset_segment-uuid.index.deleted`, 
`offset_segment-uuid.timeindex.deleted`, and 
`offset_segment-uuid.txnindex.deleted`



##########
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##########
@@ -553,29 +554,47 @@ class RemoteIndexCacheTest {
 
   @Test
   def testCorrectnessForCacheAndIndexFilesWhenResizeCache(): Unit = {
+    def getRemoteLogSegMetadataIsKept(metadataToVerify: 
List[RemoteLogSegmentMetadata]): List[RemoteLogSegmentMetadata] = {
+      metadataToVerify.filter(s => { 
cache.internalCache().asMap().keySet().contains(s.remoteLogSegmentId().id())})
+    }
 
-    def verifyEntryIsEvicted(metadataToVerify: RemoteLogSegmentMetadata, 
entryToVerify: Entry): Unit = {
-      // wait until `entryToVerify` is marked for deletion
-      TestUtils.waitUntilTrue(() => entryToVerify.isMarkedForCleanup,
+    def verifyEntryIsEvicted(metadataToVerify: List[RemoteLogSegmentMetadata], 
entriesToVerify: List[Entry],
+                             numOfMarkAsDeleted: Int): 
(List[RemoteLogSegmentMetadata], List[Entry]) = {
+      TestUtils.waitUntilTrue(() => 
entriesToVerify.count(_.isMarkedForCleanup).equals(numOfMarkAsDeleted),
         "Failed to mark evicted cache entry for cleanup after resizing cache.")
-      TestUtils.waitUntilTrue(() => entryToVerify.isCleanStarted,
+
+      TestUtils.waitUntilTrue(() => 
entriesToVerify.count(_.isCleanStarted).equals(numOfMarkAsDeleted),
         "Failed to cleanup evicted cache entry after resizing cache.")
-      // verify no index files for `entryToVerify` on remote cache dir
-      TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, 
remoteOffsetIndexFileName(metadataToVerify)).isEmpty,
-        s"Offset index file for evicted entry should not be present on disk at 
${cache.cacheDir()}")
-      TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, 
remoteTimeIndexFileName(metadataToVerify)).isEmpty,
-        s"Time index file for evicted entry should not be present on disk at 
${cache.cacheDir()}")
-      TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, 
remoteTransactionIndexFileName(metadataToVerify)).isEmpty,
-        s"Txn index file for evicted entry should not be present on disk at 
${cache.cacheDir()}")
-      TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, 
remoteDeletedSuffixIndexFileName(metadataToVerify)).isEmpty,
-        s"Index file marked for deletion for evicted entry should not be 
present on disk at ${cache.cacheDir()}")
+
+      val entriesIsMarkedForCleanup = 
entriesToVerify.filter(_.isMarkedForCleanup)
+      val entriesIsCleanStarted = entriesToVerify.filter(_.isCleanStarted)
+      // clean up entries and clean start entries should be the same
+      assertTrue(entriesIsMarkedForCleanup.equals(entriesIsCleanStarted))
+
+      // get the logSegMetadata are evicted
+      val metedataDeleted = metadataToVerify.filter(s => { 
!cache.internalCache().asMap().keySet().contains(s.remoteLogSegmentId().id())})
+      assertEquals(numOfMarkAsDeleted, metedataDeleted.size)
+      for (metadata <- metedataDeleted) {
+        // verify no index files for `entryToVerify` on remote cache dir
+        TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, 
remoteOffsetIndexFileName(metadata)).isPresent,

Review Comment:
   use `isEmpty` instead of negating `isPresent`



##########
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##########
@@ -553,29 +554,47 @@ class RemoteIndexCacheTest {
 
   @Test
   def testCorrectnessForCacheAndIndexFilesWhenResizeCache(): Unit = {
+    def getRemoteLogSegMetadataIsKept(metadataToVerify: 
List[RemoteLogSegmentMetadata]): List[RemoteLogSegmentMetadata] = {
+      metadataToVerify.filter(s => { 
cache.internalCache().asMap().keySet().contains(s.remoteLogSegmentId().id())})

Review Comment:
   nit: use containsKey instead of keySet().contains()
   
   ```
   cache.internalCache().asMap().containsKey(...)
   ```



##########
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##########
@@ -553,29 +554,47 @@ class RemoteIndexCacheTest {
 
   @Test
   def testCorrectnessForCacheAndIndexFilesWhenResizeCache(): Unit = {
+    def getRemoteLogSegMetadataIsKept(metadataToVerify: 
List[RemoteLogSegmentMetadata]): List[RemoteLogSegmentMetadata] = {
+      metadataToVerify.filter(s => { 
cache.internalCache().asMap().keySet().contains(s.remoteLogSegmentId().id())})
+    }
 
-    def verifyEntryIsEvicted(metadataToVerify: RemoteLogSegmentMetadata, 
entryToVerify: Entry): Unit = {
-      // wait until `entryToVerify` is marked for deletion
-      TestUtils.waitUntilTrue(() => entryToVerify.isMarkedForCleanup,
+    def verifyEntryIsEvicted(metadataToVerify: List[RemoteLogSegmentMetadata], 
entriesToVerify: List[Entry],
+                             numOfMarkAsDeleted: Int): 
(List[RemoteLogSegmentMetadata], List[Entry]) = {
+      TestUtils.waitUntilTrue(() => 
entriesToVerify.count(_.isMarkedForCleanup).equals(numOfMarkAsDeleted),
         "Failed to mark evicted cache entry for cleanup after resizing cache.")
-      TestUtils.waitUntilTrue(() => entryToVerify.isCleanStarted,
+
+      TestUtils.waitUntilTrue(() => 
entriesToVerify.count(_.isCleanStarted).equals(numOfMarkAsDeleted),
         "Failed to cleanup evicted cache entry after resizing cache.")
-      // verify no index files for `entryToVerify` on remote cache dir
-      TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, 
remoteOffsetIndexFileName(metadataToVerify)).isEmpty,
-        s"Offset index file for evicted entry should not be present on disk at 
${cache.cacheDir()}")
-      TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, 
remoteTimeIndexFileName(metadataToVerify)).isEmpty,
-        s"Time index file for evicted entry should not be present on disk at 
${cache.cacheDir()}")
-      TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, 
remoteTransactionIndexFileName(metadataToVerify)).isEmpty,
-        s"Txn index file for evicted entry should not be present on disk at 
${cache.cacheDir()}")
-      TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, 
remoteDeletedSuffixIndexFileName(metadataToVerify)).isEmpty,
-        s"Index file marked for deletion for evicted entry should not be 
present on disk at ${cache.cacheDir()}")
+
+      val entriesIsMarkedForCleanup = 
entriesToVerify.filter(_.isMarkedForCleanup)
+      val entriesIsCleanStarted = entriesToVerify.filter(_.isCleanStarted)
+      // clean up entries and clean start entries should be the same
+      assertTrue(entriesIsMarkedForCleanup.equals(entriesIsCleanStarted))
+
+      // get the logSegMetadata are evicted
+      val metedataDeleted = metadataToVerify.filter(s => { 
!cache.internalCache().asMap().keySet().contains(s.remoteLogSegmentId().id())})
+      assertEquals(numOfMarkAsDeleted, metedataDeleted.size)
+      for (metadata <- metedataDeleted) {
+        // verify no index files for `entryToVerify` on remote cache dir
+        TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, 
remoteOffsetIndexFileName(metadata)).isPresent,
+          s"Offset index file for evicted entry should not be present on disk 
at ${cache.cacheDir()}")
+        TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, 
remoteTimeIndexFileName(metadata)).isPresent,
+          s"Time index file for evicted entry should not be present on disk at 
${cache.cacheDir()}")
+        TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, 
remoteTransactionIndexFileName(metadata)).isPresent,
+          s"Txn index file for evicted entry should not be present on disk at 
${cache.cacheDir()}")
+        TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, 
remoteDeletedSuffixIndexFileName(metadata)).isPresent,
+          s"Index file marked for deletion for evicted entry should not be 
present on disk at ${cache.cacheDir()}")
+      }
+      (metedataDeleted, entriesIsMarkedForCleanup)
     }
 
-    def verifyEntryIsKept(metadataToVerify: RemoteLogSegmentMetadata): Unit = {
-      assertTrue(getIndexFileFromRemoteCacheDir(cache, 
remoteOffsetIndexFileName(metadataToVerify)).isPresent)
-      assertTrue(getIndexFileFromRemoteCacheDir(cache, 
remoteTimeIndexFileName(metadataToVerify)).isPresent)
-      assertTrue(getIndexFileFromRemoteCacheDir(cache, 
remoteTransactionIndexFileName(metadataToVerify)).isPresent)
-      assertTrue(getIndexFileFromRemoteCacheDir(cache, 
remoteDeletedSuffixIndexFileName(metadataToVerify)).isEmpty)
+    def verifyEntryIsKept(metadataToVerify: List[RemoteLogSegmentMetadata]): 
Unit = {
+      for (metadata <- metadataToVerify) {
+        assertTrue(getIndexFileFromRemoteCacheDir(cache, 
remoteOffsetIndexFileName(metadata)).isPresent)
+        assertTrue(getIndexFileFromRemoteCacheDir(cache, 
remoteTimeIndexFileName(metadata)).isPresent)
+        assertTrue(getIndexFileFromRemoteCacheDir(cache, 
remoteTransactionIndexFileName(metadata)).isPresent)
+        assertTrue(!getIndexFileFromRemoteCacheDir(cache, 
remoteDeletedSuffixIndexFileName(metadata)).isPresent)

Review Comment:
   same as above. 
   
   1. Filename mismatch and
   2. invert the condition, use assertFalse with isEmpty for readability.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to