apoorvmittal10 commented on code in PR #19500:
URL: https://github.com/apache/kafka/pull/19500#discussion_r2049196483


##########
core/src/main/scala/kafka/server/BrokerServer.scala:
##########
@@ -428,8 +428,8 @@ class BrokerServer(
       val fetchManager = new FetchManager(Time.SYSTEM, new 
FetchSessionCache(fetchSessionCacheShards))
 
       val shareFetchSessionCache : ShareSessionCache = new ShareSessionCache(
-        config.shareGroupConfig.shareGroupMaxGroups * 
config.groupCoordinatorConfig.shareGroupMaxSize,
-        KafkaBroker.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS)
+        config.shareGroupConfig.shareGroupMaxGroups * 
config.groupCoordinatorConfig.shareGroupMaxSize

Review Comment:
   We will replace this with new configuration in subsequent PRs, correct?



##########
server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java:
##########
@@ -140,43 +125,19 @@ public synchronized void touch(ShareSession session, long 
now) {
         }
     }
 
-    /**
-     * Try to evict an entry from the session cache.
-     * <p>
-     * A proposed new element A may evict an existing element B if:
-     * B is considered "stale" because it has been inactive for a long time.
-     *
-     * @param now        The current time in milliseconds.
-     * @return           True if an entry was evicted; false otherwise.
-     */
-    public synchronized boolean tryEvict(long now) {
-        // Try to evict an entry which is stale.
-        Map.Entry<LastUsedKey, ShareSession> lastUsedEntry = 
lastUsed.firstEntry();
-        if (lastUsedEntry == null) {
-            return false;
-        } else if (now - lastUsedEntry.getKey().lastUsedMs() > evictionMs) {
-            ShareSession session = lastUsedEntry.getValue();
-            remove(session);
-            evictionsMeter.mark();

Review Comment:
   What about evictionsMeter metric i.e. where it will get updated in new 
design? Should it go in `remove` method now? Having said that, you were right 
that now there is actually no eviction per se i.e. no forceful eviction based 
on size hence we might not require the metrics. What do you think?



##########
server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java:
##########
@@ -140,43 +125,19 @@ public synchronized void touch(ShareSession session, long 
now) {
         }
     }
 
-    /**
-     * Try to evict an entry from the session cache.
-     * <p>
-     * A proposed new element A may evict an existing element B if:
-     * B is considered "stale" because it has been inactive for a long time.
-     *
-     * @param now        The current time in milliseconds.
-     * @return           True if an entry was evicted; false otherwise.
-     */
-    public synchronized boolean tryEvict(long now) {
-        // Try to evict an entry which is stale.
-        Map.Entry<LastUsedKey, ShareSession> lastUsedEntry = 
lastUsed.firstEntry();
-        if (lastUsedEntry == null) {
-            return false;
-        } else if (now - lastUsedEntry.getKey().lastUsedMs() > evictionMs) {
-            ShareSession session = lastUsedEntry.getValue();
-            remove(session);
-            evictionsMeter.mark();

Review Comment:
   If not eviction then should we have `ShareSessionRemovePerSec` meter, will 
it be of any help?



##########
server/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java:
##########
@@ -44,75 +43,45 @@ public void setUp() {
 
     @Test
     public void testShareSessionCache() throws InterruptedException {
-        ShareSessionCache cache = new ShareSessionCache(3, 100);
+        ShareSessionCache cache = new ShareSessionCache(3);
         assertEquals(0, cache.size());
-        ShareSessionKey key1 = cache.maybeCreateSession("grp", 
Uuid.randomUuid(), 0, mockedSharePartitionMap(10));
-        ShareSessionKey key2 = cache.maybeCreateSession("grp", 
Uuid.randomUuid(), 10, mockedSharePartitionMap(20));
-        ShareSessionKey key3 = cache.maybeCreateSession("grp", 
Uuid.randomUuid(), 20, mockedSharePartitionMap(30));
-        assertNull(cache.maybeCreateSession("grp", Uuid.randomUuid(), 30, 
mockedSharePartitionMap(40)));
-        assertNull(cache.maybeCreateSession("grp", Uuid.randomUuid(), 40, 
mockedSharePartitionMap(5)));
+        ShareSessionKey key1 = cache.maybeCreateSession("grp", 
Uuid.randomUuid(), mockedSharePartitionMap(10));
+        ShareSessionKey key2 = cache.maybeCreateSession("grp", 
Uuid.randomUuid(), mockedSharePartitionMap(20));
+        ShareSessionKey key3 = cache.maybeCreateSession("grp", 
Uuid.randomUuid(), mockedSharePartitionMap(30));
+        assertNull(cache.maybeCreateSession("grp", Uuid.randomUuid(), 
mockedSharePartitionMap(40)));
+        assertNull(cache.maybeCreateSession("grp", Uuid.randomUuid(), 
mockedSharePartitionMap(5)));
         assertShareCacheContains(cache, List.of(key1, key2, key3));
 
-        TestUtils.waitForCondition(() -> 
yammerMetricValue(ShareSessionCache.SHARE_SESSIONS_COUNT).intValue() == 3,
-            "Share session count should be 3.");
-        TestUtils.waitForCondition(() -> 
yammerMetricValue(ShareSessionCache.SHARE_PARTITIONS_COUNT).intValue() == 60,
-            "Share partition count should be 60.");
-        assertEquals(0, cache.evictionsMeter().count());
-
-        // Touch the sessions to update the last used time, so that the key-2 
can be evicted.
-        cache.touch(cache.get(key1), 200);

Review Comment:
   Shaouldn't we use cache.updateNumPartitions here?



##########
server/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java:
##########
@@ -44,75 +43,45 @@ public void setUp() {
 
     @Test
     public void testShareSessionCache() throws InterruptedException {
-        ShareSessionCache cache = new ShareSessionCache(3, 100);
+        ShareSessionCache cache = new ShareSessionCache(3);
         assertEquals(0, cache.size());
-        ShareSessionKey key1 = cache.maybeCreateSession("grp", 
Uuid.randomUuid(), 0, mockedSharePartitionMap(10));
-        ShareSessionKey key2 = cache.maybeCreateSession("grp", 
Uuid.randomUuid(), 10, mockedSharePartitionMap(20));
-        ShareSessionKey key3 = cache.maybeCreateSession("grp", 
Uuid.randomUuid(), 20, mockedSharePartitionMap(30));
-        assertNull(cache.maybeCreateSession("grp", Uuid.randomUuid(), 30, 
mockedSharePartitionMap(40)));
-        assertNull(cache.maybeCreateSession("grp", Uuid.randomUuid(), 40, 
mockedSharePartitionMap(5)));
+        ShareSessionKey key1 = cache.maybeCreateSession("grp", 
Uuid.randomUuid(), mockedSharePartitionMap(10));
+        ShareSessionKey key2 = cache.maybeCreateSession("grp", 
Uuid.randomUuid(), mockedSharePartitionMap(20));
+        ShareSessionKey key3 = cache.maybeCreateSession("grp", 
Uuid.randomUuid(), mockedSharePartitionMap(30));
+        assertNull(cache.maybeCreateSession("grp", Uuid.randomUuid(), 
mockedSharePartitionMap(40)));
+        assertNull(cache.maybeCreateSession("grp", Uuid.randomUuid(), 
mockedSharePartitionMap(5)));
         assertShareCacheContains(cache, List.of(key1, key2, key3));
 
-        TestUtils.waitForCondition(() -> 
yammerMetricValue(ShareSessionCache.SHARE_SESSIONS_COUNT).intValue() == 3,
-            "Share session count should be 3.");
-        TestUtils.waitForCondition(() -> 
yammerMetricValue(ShareSessionCache.SHARE_PARTITIONS_COUNT).intValue() == 60,
-            "Share partition count should be 60.");
-        assertEquals(0, cache.evictionsMeter().count());

Review Comment:
   Why session and partitions count metric is removed in test?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to