apoorvmittal10 commented on code in PR #19500: URL: https://github.com/apache/kafka/pull/19500#discussion_r2049196483
########## core/src/main/scala/kafka/server/BrokerServer.scala: ########## @@ -428,8 +428,8 @@ class BrokerServer( val fetchManager = new FetchManager(Time.SYSTEM, new FetchSessionCache(fetchSessionCacheShards)) val shareFetchSessionCache : ShareSessionCache = new ShareSessionCache( - config.shareGroupConfig.shareGroupMaxGroups * config.groupCoordinatorConfig.shareGroupMaxSize, - KafkaBroker.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS) + config.shareGroupConfig.shareGroupMaxGroups * config.groupCoordinatorConfig.shareGroupMaxSize Review Comment: We will replace this with new configuration in subsequent PRs, correct? ########## server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java: ########## @@ -140,43 +125,19 @@ public synchronized void touch(ShareSession session, long now) { } } - /** - * Try to evict an entry from the session cache. - * <p> - * A proposed new element A may evict an existing element B if: - * B is considered "stale" because it has been inactive for a long time. - * - * @param now The current time in milliseconds. - * @return True if an entry was evicted; false otherwise. - */ - public synchronized boolean tryEvict(long now) { - // Try to evict an entry which is stale. - Map.Entry<LastUsedKey, ShareSession> lastUsedEntry = lastUsed.firstEntry(); - if (lastUsedEntry == null) { - return false; - } else if (now - lastUsedEntry.getKey().lastUsedMs() > evictionMs) { - ShareSession session = lastUsedEntry.getValue(); - remove(session); - evictionsMeter.mark(); Review Comment: What about evictionsMeter metric i.e. where it will get updated in new design? Should it go in `remove` method now? Having said that, you were right that now there is actually no eviction per se i.e. no forceful eviction based on size hence we might not require the metrics. What do you think? ########## server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java: ########## @@ -140,43 +125,19 @@ public synchronized void touch(ShareSession session, long now) { } } - /** - * Try to evict an entry from the session cache. - * <p> - * A proposed new element A may evict an existing element B if: - * B is considered "stale" because it has been inactive for a long time. - * - * @param now The current time in milliseconds. - * @return True if an entry was evicted; false otherwise. - */ - public synchronized boolean tryEvict(long now) { - // Try to evict an entry which is stale. - Map.Entry<LastUsedKey, ShareSession> lastUsedEntry = lastUsed.firstEntry(); - if (lastUsedEntry == null) { - return false; - } else if (now - lastUsedEntry.getKey().lastUsedMs() > evictionMs) { - ShareSession session = lastUsedEntry.getValue(); - remove(session); - evictionsMeter.mark(); Review Comment: If not eviction then should we have `ShareSessionRemovePerSec` meter, will it be of any help? ########## server/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java: ########## @@ -44,75 +43,45 @@ public void setUp() { @Test public void testShareSessionCache() throws InterruptedException { - ShareSessionCache cache = new ShareSessionCache(3, 100); + ShareSessionCache cache = new ShareSessionCache(3); assertEquals(0, cache.size()); - ShareSessionKey key1 = cache.maybeCreateSession("grp", Uuid.randomUuid(), 0, mockedSharePartitionMap(10)); - ShareSessionKey key2 = cache.maybeCreateSession("grp", Uuid.randomUuid(), 10, mockedSharePartitionMap(20)); - ShareSessionKey key3 = cache.maybeCreateSession("grp", Uuid.randomUuid(), 20, mockedSharePartitionMap(30)); - assertNull(cache.maybeCreateSession("grp", Uuid.randomUuid(), 30, mockedSharePartitionMap(40))); - assertNull(cache.maybeCreateSession("grp", Uuid.randomUuid(), 40, mockedSharePartitionMap(5))); + ShareSessionKey key1 = cache.maybeCreateSession("grp", Uuid.randomUuid(), mockedSharePartitionMap(10)); + ShareSessionKey key2 = cache.maybeCreateSession("grp", Uuid.randomUuid(), mockedSharePartitionMap(20)); + ShareSessionKey key3 = cache.maybeCreateSession("grp", Uuid.randomUuid(), mockedSharePartitionMap(30)); + assertNull(cache.maybeCreateSession("grp", Uuid.randomUuid(), mockedSharePartitionMap(40))); + assertNull(cache.maybeCreateSession("grp", Uuid.randomUuid(), mockedSharePartitionMap(5))); assertShareCacheContains(cache, List.of(key1, key2, key3)); - TestUtils.waitForCondition(() -> yammerMetricValue(ShareSessionCache.SHARE_SESSIONS_COUNT).intValue() == 3, - "Share session count should be 3."); - TestUtils.waitForCondition(() -> yammerMetricValue(ShareSessionCache.SHARE_PARTITIONS_COUNT).intValue() == 60, - "Share partition count should be 60."); - assertEquals(0, cache.evictionsMeter().count()); - - // Touch the sessions to update the last used time, so that the key-2 can be evicted. - cache.touch(cache.get(key1), 200); Review Comment: Shaouldn't we use cache.updateNumPartitions here? ########## server/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java: ########## @@ -44,75 +43,45 @@ public void setUp() { @Test public void testShareSessionCache() throws InterruptedException { - ShareSessionCache cache = new ShareSessionCache(3, 100); + ShareSessionCache cache = new ShareSessionCache(3); assertEquals(0, cache.size()); - ShareSessionKey key1 = cache.maybeCreateSession("grp", Uuid.randomUuid(), 0, mockedSharePartitionMap(10)); - ShareSessionKey key2 = cache.maybeCreateSession("grp", Uuid.randomUuid(), 10, mockedSharePartitionMap(20)); - ShareSessionKey key3 = cache.maybeCreateSession("grp", Uuid.randomUuid(), 20, mockedSharePartitionMap(30)); - assertNull(cache.maybeCreateSession("grp", Uuid.randomUuid(), 30, mockedSharePartitionMap(40))); - assertNull(cache.maybeCreateSession("grp", Uuid.randomUuid(), 40, mockedSharePartitionMap(5))); + ShareSessionKey key1 = cache.maybeCreateSession("grp", Uuid.randomUuid(), mockedSharePartitionMap(10)); + ShareSessionKey key2 = cache.maybeCreateSession("grp", Uuid.randomUuid(), mockedSharePartitionMap(20)); + ShareSessionKey key3 = cache.maybeCreateSession("grp", Uuid.randomUuid(), mockedSharePartitionMap(30)); + assertNull(cache.maybeCreateSession("grp", Uuid.randomUuid(), mockedSharePartitionMap(40))); + assertNull(cache.maybeCreateSession("grp", Uuid.randomUuid(), mockedSharePartitionMap(5))); assertShareCacheContains(cache, List.of(key1, key2, key3)); - TestUtils.waitForCondition(() -> yammerMetricValue(ShareSessionCache.SHARE_SESSIONS_COUNT).intValue() == 3, - "Share session count should be 3."); - TestUtils.waitForCondition(() -> yammerMetricValue(ShareSessionCache.SHARE_PARTITIONS_COUNT).intValue() == 60, - "Share partition count should be 60."); - assertEquals(0, cache.evictionsMeter().count()); Review Comment: Why session and partitions count metric is removed in test? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org