Re: [PR] KAFKA-18653: Fix mocks and potential thread leak issues causing silent RejectedExecutionException in share group broker tests [kafka]
apoorvmittal10 commented on code in PR #18725: URL: https://github.com/apache/kafka/pull/18725#discussion_r1949839653 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -5977,6 +5980,67 @@ class ReplicaManagerTest { ) } + @Test + def testDelayedShareFetchPurgatoryOperationExpiration(): Unit = { +val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_))) +val rm = new ReplicaManager( Review Comment: Apologies, I missed it in review. Should have caught it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18653: Fix mocks and potential thread leak issues causing silent RejectedExecutionException in share group broker tests [kafka]
apoorvmittal10 commented on code in PR #18725: URL: https://github.com/apache/kafka/pull/18725#discussion_r1949830538 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -5977,6 +5980,67 @@ class ReplicaManagerTest { ) } + @Test + def testDelayedShareFetchPurgatoryOperationExpiration(): Unit = { +val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_))) +val rm = new ReplicaManager( Review Comment: Thanks @chia7712 , make sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18653: Fix mocks and potential thread leak issues causing silent RejectedExecutionException in share group broker tests [kafka]
chia7712 commented on code in PR #18725: URL: https://github.com/apache/kafka/pull/18725#discussion_r1949798227 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -5977,6 +5980,67 @@ class ReplicaManagerTest { ) } + @Test + def testDelayedShareFetchPurgatoryOperationExpiration(): Unit = { +val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_))) +val rm = new ReplicaManager( Review Comment: As a reminder, it's crucial to utilize a `try-finally` block to ensure proper closure of the `ReplicaManager`. Failing to do so can result in an unreleased thread from the purgatory, potentially leading to errors in subsequent integration tests that incorporate thread leak detection. ``` org.opentest4j.AssertionFailedError: Found 1 unexpected threads during @BeforeAll: executor-ShareFetch ==> expected: but was: ``` Also, the error can be reproduced by following command. ``` ./gradlew core:test --tests ReplicaManagerTest --tests SaslApiVersionsRequestTest --tests LeaderEpochIntegrationTest --tests RequestQuotaTest -PmaxParallelForks=1 ``` I will fix it in https://issues.apache.org/jira/browse/KAFKA-18770 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18653: Fix mocks and potential thread leak issues causing silent RejectedExecutionException in share group broker tests [kafka]
AndrewJSchofield merged PR #18725: URL: https://github.com/apache/kafka/pull/18725 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18653: Fix mocks and potential thread leak issues causing silent RejectedExecutionException in share group broker tests [kafka]
adixitconfluent commented on code in PR #18725: URL: https://github.com/apache/kafka/pull/18725#discussion_r1933934456 ## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ## @@ -68,7 +68,14 @@ public class DelayedShareFetch extends DelayedOperation { private LinkedHashMap partitionsAcquired; private LinkedHashMap partitionsAlreadyFetched; -DelayedShareFetch( +/** + * This function returns an instance of delayed share fetch operation for completing share fetch requests instantaneously or with delay. Review Comment: done ## core/src/main/java/kafka/server/share/SharePartition.java: ## @@ -1106,8 +1106,9 @@ boolean canAcquireRecords() { * the records are fetched and acquired. * * @return A boolean which indicates whether the fetch lock is acquired. + * Visible for testing. Review Comment: I have removed "Visible for testing" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18653: Fix mocks and potential thread leak issues causing silent RejectedExecutionException in share group broker tests [kafka]
AndrewJSchofield commented on code in PR #18725: URL: https://github.com/apache/kafka/pull/18725#discussion_r1933895394 ## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ## @@ -68,7 +68,14 @@ public class DelayedShareFetch extends DelayedOperation { private LinkedHashMap partitionsAcquired; private LinkedHashMap partitionsAlreadyFetched; -DelayedShareFetch( +/** + * This function returns an instance of delayed share fetch operation for completing share fetch requests instantaneously or with delay. Review Comment: Constructs rather than returns I think. ## core/src/main/java/kafka/server/share/SharePartition.java: ## @@ -1106,8 +1106,9 @@ boolean canAcquireRecords() { * the records are fetched and acquired. * * @return A boolean which indicates whether the fetch lock is acquired. + * Visible for testing. Review Comment: If you look at the javadoc, this sentence is part of the description for the return value. Either remove "Visible for testing" (preferred) or make sure it fits in the javadoc more nicely. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18653: Fix mocks and potential thread leak issues causing silent RejectedExecutionException in share group broker tests [kafka]
adixitconfluent commented on code in PR #18725: URL: https://github.com/apache/kafka/pull/18725#discussion_r1933704511 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -5977,6 +5980,64 @@ class ReplicaManagerTest { ) } + @Test + def testDelayedShareFetchPurgatoryOperationExpiration(): Unit = { +val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_))) +val rm = new ReplicaManager( + metrics = metrics, + config = config, + time = time, + scheduler = new MockScheduler(time), + logManager = mockLogMgr, + quotaManagers = quotaManager, + metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () => KRaftVersion.KRAFT_VERSION_0), + logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), + alterPartitionManager = alterPartitionManager) + +val groupId = "grp" +val tp1 = new TopicIdPartition(Uuid.randomUuid, new TopicPartition("foo1", 0)) +val partitionMaxBytes = new util.LinkedHashMap[TopicIdPartition, Integer] +partitionMaxBytes.put(tp1, 1000) + +val sp1 = mock(classOf[SharePartition]) +val sharePartitions = new util.LinkedHashMap[TopicIdPartition, SharePartition] +sharePartitions.put(tp1, sp1) + +val future = new CompletableFuture[util.Map[TopicIdPartition, ShareFetchResponseData.PartitionData]] +val shareFetch = new ShareFetch( + new FetchParams(ApiKeys.SHARE_FETCH.latestVersion, FetchRequest.ORDINARY_CONSUMER_ID, -1, 500, 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty, true), + groupId, + Uuid.randomUuid.toString, + future, + partitionMaxBytes, + 500, + 100, + brokerTopicStats) + +val delayedShareFetch = new DelayedShareFetch( + shareFetch, + rm, + mock(classOf[BiConsumer[SharePartitionKey, Throwable]]), + sharePartitions) Review Comment: actually we should check for number of method calls for `forceComplete` since `onComplete` is more user logic based and can cause problems in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18653: Fix mocks and potential thread leak issues causing silent RejectedExecutionException in share group broker tests [kafka]
adixitconfluent commented on code in PR #18725: URL: https://github.com/apache/kafka/pull/18725#discussion_r1933651209 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -5977,6 +5980,64 @@ class ReplicaManagerTest { ) } + @Test + def testDelayedShareFetchPurgatoryOperationExpiration(): Unit = { +val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_))) +val rm = new ReplicaManager( + metrics = metrics, + config = config, + time = time, + scheduler = new MockScheduler(time), + logManager = mockLogMgr, + quotaManagers = quotaManager, + metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () => KRaftVersion.KRAFT_VERSION_0), + logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), + alterPartitionManager = alterPartitionManager) + +val groupId = "grp" +val tp1 = new TopicIdPartition(Uuid.randomUuid, new TopicPartition("foo1", 0)) +val partitionMaxBytes = new util.LinkedHashMap[TopicIdPartition, Integer] +partitionMaxBytes.put(tp1, 1000) Review Comment: actually we require partitionMaxBytes of type `util.LinkedHashMap`. Hence, we cannot use `mkMap`, I have the failing [build](https://github.com/apache/kafka/actions/runs/13029166140/job/36344429628?pr=18725) in `core:compileTestScala` if we use `mkMap` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18653: Fix mocks and potential thread leak issues causing silent RejectedExecutionException in share group broker tests [kafka]
adixitconfluent commented on code in PR #18725: URL: https://github.com/apache/kafka/pull/18725#discussion_r1933639965 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -5977,6 +5980,64 @@ class ReplicaManagerTest { ) } + @Test + def testDelayedShareFetchPurgatoryOperationExpiration(): Unit = { +val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_))) +val rm = new ReplicaManager( + metrics = metrics, + config = config, + time = time, + scheduler = new MockScheduler(time), + logManager = mockLogMgr, + quotaManagers = quotaManager, + metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () => KRaftVersion.KRAFT_VERSION_0), + logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), + alterPartitionManager = alterPartitionManager) + +val groupId = "grp" +val tp1 = new TopicIdPartition(Uuid.randomUuid, new TopicPartition("foo1", 0)) +val partitionMaxBytes = new util.LinkedHashMap[TopicIdPartition, Integer] +partitionMaxBytes.put(tp1, 1000) + +val sp1 = mock(classOf[SharePartition]) +val sharePartitions = new util.LinkedHashMap[TopicIdPartition, SharePartition] +sharePartitions.put(tp1, sp1) + +val future = new CompletableFuture[util.Map[TopicIdPartition, ShareFetchResponseData.PartitionData]] +val shareFetch = new ShareFetch( + new FetchParams(ApiKeys.SHARE_FETCH.latestVersion, FetchRequest.ORDINARY_CONSUMER_ID, -1, 500, 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty, true), + groupId, + Uuid.randomUuid.toString, + future, + partitionMaxBytes, + 500, + 100, + brokerTopicStats) + +val delayedShareFetch = new DelayedShareFetch( + shareFetch, + rm, + mock(classOf[BiConsumer[SharePartitionKey, Throwable]]), + sharePartitions) Review Comment: ideally,` assertFalse(future.isDone)` should be enough, but I've added a check for number of method calls for `onComplete` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18653: Fix mocks and potential thread leak issues causing silent RejectedExecutionException in share group broker tests [kafka]
adixitconfluent commented on code in PR #18725: URL: https://github.com/apache/kafka/pull/18725#discussion_r1933626593 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -5977,6 +5980,64 @@ class ReplicaManagerTest { ) } + @Test + def testDelayedShareFetchPurgatoryOperationExpiration(): Unit = { +val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_))) +val rm = new ReplicaManager( + metrics = metrics, + config = config, + time = time, + scheduler = new MockScheduler(time), + logManager = mockLogMgr, + quotaManagers = quotaManager, + metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () => KRaftVersion.KRAFT_VERSION_0), + logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), + alterPartitionManager = alterPartitionManager) + +val groupId = "grp" +val tp1 = new TopicIdPartition(Uuid.randomUuid, new TopicPartition("foo1", 0)) +val partitionMaxBytes = new util.LinkedHashMap[TopicIdPartition, Integer] +partitionMaxBytes.put(tp1, 1000) + +val sp1 = mock(classOf[SharePartition]) +val sharePartitions = new util.LinkedHashMap[TopicIdPartition, SharePartition] +sharePartitions.put(tp1, sp1) + +val future = new CompletableFuture[util.Map[TopicIdPartition, ShareFetchResponseData.PartitionData]] +val shareFetch = new ShareFetch( + new FetchParams(ApiKeys.SHARE_FETCH.latestVersion, FetchRequest.ORDINARY_CONSUMER_ID, -1, 500, 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty, true), + groupId, + Uuid.randomUuid.toString, + future, + partitionMaxBytes, + 500, + 100, + brokerTopicStats) + +val delayedShareFetch = new DelayedShareFetch( + shareFetch, + rm, + mock(classOf[BiConsumer[SharePartitionKey, Throwable]]), + sharePartitions) + +val delayedShareFetchWatchKeys : util.List[DelayedShareFetchKey] = new util.ArrayList[DelayedShareFetchKey] +partitionMaxBytes.keySet.forEach((topicIdPartition: TopicIdPartition) => delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId, topicIdPartition.partition))) + +// You cannot acquire records for sp1, so request will be stored in purgatory waiting for timeout. +when(sp1.maybeAcquireFetchLock).thenReturn(false) + +rm.addDelayedShareFetchRequest(delayedShareFetch = delayedShareFetch, delayedShareFetchKeys = delayedShareFetchWatchKeys) +assertEquals(1, rm.delayedShareFetchPurgatory.watched) + +// Future is not complete initially. +assertFalse(future.isDone) +// Post timeout, share fetch request will timeout and the future should complete. +waitUntilTrue(() => future.isDone, "Processing in delayed share fetch purgatory never ended.", 1000) Review Comment: yes, added a comment around the same. In this test case, the timeout is set at 500ms but I've kept a buffer of additional 500ms so the task can always timeout. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18653: Fix mocks and potential thread leak issues causing silent RejectedExecutionException in share group broker tests [kafka]
apoorvmittal10 commented on code in PR #18725: URL: https://github.com/apache/kafka/pull/18725#discussion_r1933620080 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -5977,6 +5980,64 @@ class ReplicaManagerTest { ) } + @Test + def testDelayedShareFetchPurgatoryOperationExpiration(): Unit = { +val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_))) +val rm = new ReplicaManager( + metrics = metrics, + config = config, + time = time, + scheduler = new MockScheduler(time), + logManager = mockLogMgr, + quotaManagers = quotaManager, + metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () => KRaftVersion.KRAFT_VERSION_0), + logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), + alterPartitionManager = alterPartitionManager) + +val groupId = "grp" +val tp1 = new TopicIdPartition(Uuid.randomUuid, new TopicPartition("foo1", 0)) +val partitionMaxBytes = new util.LinkedHashMap[TopicIdPartition, Integer] +partitionMaxBytes.put(tp1, 1000) + +val sp1 = mock(classOf[SharePartition]) +val sharePartitions = new util.LinkedHashMap[TopicIdPartition, SharePartition] +sharePartitions.put(tp1, sp1) + +val future = new CompletableFuture[util.Map[TopicIdPartition, ShareFetchResponseData.PartitionData]] +val shareFetch = new ShareFetch( + new FetchParams(ApiKeys.SHARE_FETCH.latestVersion, FetchRequest.ORDINARY_CONSUMER_ID, -1, 500, 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty, true), + groupId, + Uuid.randomUuid.toString, + future, + partitionMaxBytes, + 500, + 100, + brokerTopicStats) + +val delayedShareFetch = new DelayedShareFetch( + shareFetch, + rm, + mock(classOf[BiConsumer[SharePartitionKey, Throwable]]), + sharePartitions) Review Comment: Hmmm, sure but how do we guarantee that the request was timeout and not copleted in normal flow. By checking `assertFalse(future.isDone)` initially? Or should we also check the method call which should only happen when timeout gets triggered i.e. `forceComplete`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18653: Fix mocks and potential thread leak issues causing silent RejectedExecutionException in share group broker tests [kafka]
apoorvmittal10 commented on code in PR #18725: URL: https://github.com/apache/kafka/pull/18725#discussion_r1933609359 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -5977,6 +5980,64 @@ class ReplicaManagerTest { ) } + @Test + def testDelayedShareFetchPurgatoryOperationExpiration(): Unit = { +val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_))) +val rm = new ReplicaManager( + metrics = metrics, + config = config, + time = time, + scheduler = new MockScheduler(time), + logManager = mockLogMgr, + quotaManagers = quotaManager, + metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () => KRaftVersion.KRAFT_VERSION_0), + logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), + alterPartitionManager = alterPartitionManager) + +val groupId = "grp" +val tp1 = new TopicIdPartition(Uuid.randomUuid, new TopicPartition("foo1", 0)) +val partitionMaxBytes = new util.LinkedHashMap[TopicIdPartition, Integer] +partitionMaxBytes.put(tp1, 1000) + +val sp1 = mock(classOf[SharePartition]) +val sharePartitions = new util.LinkedHashMap[TopicIdPartition, SharePartition] +sharePartitions.put(tp1, sp1) + +val future = new CompletableFuture[util.Map[TopicIdPartition, ShareFetchResponseData.PartitionData]] +val shareFetch = new ShareFetch( + new FetchParams(ApiKeys.SHARE_FETCH.latestVersion, FetchRequest.ORDINARY_CONSUMER_ID, -1, 500, 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty, true), + groupId, + Uuid.randomUuid.toString, + future, + partitionMaxBytes, + 500, + 100, + brokerTopicStats) + +val delayedShareFetch = new DelayedShareFetch( + shareFetch, + rm, + mock(classOf[BiConsumer[SharePartitionKey, Throwable]]), + sharePartitions) + +val delayedShareFetchWatchKeys : util.List[DelayedShareFetchKey] = new util.ArrayList[DelayedShareFetchKey] +partitionMaxBytes.keySet.forEach((topicIdPartition: TopicIdPartition) => delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId, topicIdPartition.partition))) + +// You cannot acquire records for sp1, so request will be stored in purgatory waiting for timeout. +when(sp1.maybeAcquireFetchLock).thenReturn(false) + +rm.addDelayedShareFetchRequest(delayedShareFetch = delayedShareFetch, delayedShareFetchKeys = delayedShareFetchWatchKeys) +assertEquals(1, rm.delayedShareFetchPurgatory.watched) + +// Future is not complete initially. +assertFalse(future.isDone) +// Post timeout, share fetch request will timeout and the future should complete. +waitUntilTrue(() => future.isDone, "Processing in delayed share fetch purgatory never ended.", 1000) Review Comment: Shall we have a bit buffer say, 1200 and write in comments that 1000 is default timeout but kept a buffer of 200 so the task can alwawys timeout? ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -5977,6 +5980,64 @@ class ReplicaManagerTest { ) } + @Test + def testDelayedShareFetchPurgatoryOperationExpiration(): Unit = { +val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_))) +val rm = new ReplicaManager( + metrics = metrics, + config = config, + time = time, + scheduler = new MockScheduler(time), + logManager = mockLogMgr, + quotaManagers = quotaManager, + metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () => KRaftVersion.KRAFT_VERSION_0), + logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), + alterPartitionManager = alterPartitionManager) + +val groupId = "grp" +val tp1 = new TopicIdPartition(Uuid.randomUuid, new TopicPartition("foo1", 0)) +val partitionMaxBytes = new util.LinkedHashMap[TopicIdPartition, Integer] +partitionMaxBytes.put(tp1, 1000) + +val sp1 = mock(classOf[SharePartition]) +val sharePartitions = new util.LinkedHashMap[TopicIdPartition, SharePartition] +sharePartitions.put(tp1, sp1) + +val future = new CompletableFuture[util.Map[TopicIdPartition, ShareFetchResponseData.PartitionData]] +val shareFetch = new ShareFetch( + new FetchParams(ApiKeys.SHARE_FETCH.latestVersion, FetchRequest.ORDINARY_CONSUMER_ID, -1, 500, 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty, true), + groupId, + Uuid.randomUuid.toString, + future, + partitionMaxBytes, + 500, + 100, + brokerTopicStats) + +val delayedShareFetch = new DelayedShareFetch( + shareFetch, + rm, + mock(classOf[BiConsumer[SharePartitionKey, Throwable]]), + sharePartitions) Review Comment: Hmmm, sure but how do we guarantee that the request was timeout and not copleted in normal flow. By checking `assertFalse(future.isDone)` initial
Re: [PR] KAFKA-18653: Fix mocks and potential thread leak issues causing silent RejectedExecutionException in share group broker tests [kafka]
adixitconfluent commented on code in PR #18725: URL: https://github.com/apache/kafka/pull/18725#discussion_r1933573691 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -5977,6 +5980,64 @@ class ReplicaManagerTest { ) } + @Test + def testDelayedShareFetchPurgatoryOperationExpiration(): Unit = { +val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_))) +val rm = new ReplicaManager( + metrics = metrics, + config = config, + time = time, + scheduler = new MockScheduler(time), + logManager = mockLogMgr, + quotaManagers = quotaManager, + metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () => KRaftVersion.KRAFT_VERSION_0), + logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), + alterPartitionManager = alterPartitionManager) + +val groupId = "grp" +val tp1 = new TopicIdPartition(Uuid.randomUuid, new TopicPartition("foo1", 0)) +val partitionMaxBytes = new util.LinkedHashMap[TopicIdPartition, Integer] +partitionMaxBytes.put(tp1, 1000) + +val sp1 = mock(classOf[SharePartition]) +val sharePartitions = new util.LinkedHashMap[TopicIdPartition, SharePartition] +sharePartitions.put(tp1, sp1) + +val future = new CompletableFuture[util.Map[TopicIdPartition, ShareFetchResponseData.PartitionData]] +val shareFetch = new ShareFetch( + new FetchParams(ApiKeys.SHARE_FETCH.latestVersion, FetchRequest.ORDINARY_CONSUMER_ID, -1, 500, 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty, true), + groupId, + Uuid.randomUuid.toString, + future, + partitionMaxBytes, + 500, + 100, + brokerTopicStats) + +val delayedShareFetch = new DelayedShareFetch( + shareFetch, + rm, + mock(classOf[BiConsumer[SharePartitionKey, Throwable]]), + sharePartitions) Review Comment: I think the relevant attributes of `DelayedShareFetch` have already been mocked, for example - exception handler. If we don't do this, we'd have to write a bunch of `when` statements for `delayedShareFetch`. Hence, I prefer an instance instead of a mock. We are not playing with the behaviour of `DelayedShareFetch` and we need `shareFetch` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18653: Fix mocks and potential thread leak issues causing silent RejectedExecutionException in share group broker tests [kafka]
adixitconfluent commented on code in PR #18725: URL: https://github.com/apache/kafka/pull/18725#discussion_r1933573691 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -5977,6 +5980,64 @@ class ReplicaManagerTest { ) } + @Test + def testDelayedShareFetchPurgatoryOperationExpiration(): Unit = { +val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_))) +val rm = new ReplicaManager( + metrics = metrics, + config = config, + time = time, + scheduler = new MockScheduler(time), + logManager = mockLogMgr, + quotaManagers = quotaManager, + metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () => KRaftVersion.KRAFT_VERSION_0), + logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), + alterPartitionManager = alterPartitionManager) + +val groupId = "grp" +val tp1 = new TopicIdPartition(Uuid.randomUuid, new TopicPartition("foo1", 0)) +val partitionMaxBytes = new util.LinkedHashMap[TopicIdPartition, Integer] +partitionMaxBytes.put(tp1, 1000) + +val sp1 = mock(classOf[SharePartition]) +val sharePartitions = new util.LinkedHashMap[TopicIdPartition, SharePartition] +sharePartitions.put(tp1, sp1) + +val future = new CompletableFuture[util.Map[TopicIdPartition, ShareFetchResponseData.PartitionData]] +val shareFetch = new ShareFetch( + new FetchParams(ApiKeys.SHARE_FETCH.latestVersion, FetchRequest.ORDINARY_CONSUMER_ID, -1, 500, 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty, true), + groupId, + Uuid.randomUuid.toString, + future, + partitionMaxBytes, + 500, + 100, + brokerTopicStats) + +val delayedShareFetch = new DelayedShareFetch( + shareFetch, + rm, + mock(classOf[BiConsumer[SharePartitionKey, Throwable]]), + sharePartitions) Review Comment: I think the relevant attributes of `DelayedShareFetch` have already been mocked, for example - exception handler. If we don't do this, we'd have to write a bunch of `when` statements for `delayedShareFetch`. Hence, I prefer an instance instead of a mock. We are not playing with the behaviour of `DelayedShareFetch` anyways. ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -5977,6 +5980,64 @@ class ReplicaManagerTest { ) } + @Test + def testDelayedShareFetchPurgatoryOperationExpiration(): Unit = { +val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_))) +val rm = new ReplicaManager( + metrics = metrics, + config = config, + time = time, + scheduler = new MockScheduler(time), + logManager = mockLogMgr, + quotaManagers = quotaManager, + metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () => KRaftVersion.KRAFT_VERSION_0), + logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), + alterPartitionManager = alterPartitionManager) + +val groupId = "grp" +val tp1 = new TopicIdPartition(Uuid.randomUuid, new TopicPartition("foo1", 0)) +val partitionMaxBytes = new util.LinkedHashMap[TopicIdPartition, Integer] +partitionMaxBytes.put(tp1, 1000) + +val sp1 = mock(classOf[SharePartition]) +val sharePartitions = new util.LinkedHashMap[TopicIdPartition, SharePartition] +sharePartitions.put(tp1, sp1) + +val future = new CompletableFuture[util.Map[TopicIdPartition, ShareFetchResponseData.PartitionData]] +val shareFetch = new ShareFetch( + new FetchParams(ApiKeys.SHARE_FETCH.latestVersion, FetchRequest.ORDINARY_CONSUMER_ID, -1, 500, 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty, true), + groupId, + Uuid.randomUuid.toString, + future, + partitionMaxBytes, + 500, + 100, + brokerTopicStats) Review Comment: Again, we need attributes of ShareFetch here like `FetchParams.maxWaitMs`, so I don't think mocking it is the right approach here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18653: Fix mocks and potential thread leak issues causing silent RejectedExecutionException in share group broker tests [kafka]
apoorvmittal10 commented on code in PR #18725: URL: https://github.com/apache/kafka/pull/18725#discussion_r1933545503 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -5977,6 +5980,64 @@ class ReplicaManagerTest { ) } + @Test + def testDelayedShareFetchPurgatoryOperationExpiration(): Unit = { +val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_))) +val rm = new ReplicaManager( + metrics = metrics, + config = config, + time = time, + scheduler = new MockScheduler(time), + logManager = mockLogMgr, + quotaManagers = quotaManager, + metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () => KRaftVersion.KRAFT_VERSION_0), + logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), + alterPartitionManager = alterPartitionManager) + +val groupId = "grp" +val tp1 = new TopicIdPartition(Uuid.randomUuid, new TopicPartition("foo1", 0)) +val partitionMaxBytes = new util.LinkedHashMap[TopicIdPartition, Integer] +partitionMaxBytes.put(tp1, 1000) Review Comment: Can we use mkMap from Utils? ## core/src/test/java/kafka/server/share/DelayedShareFetchTest.java: ## @@ -522,6 +532,9 @@ public void testForceCompleteTriggersDelayedActionsQueue() { doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp1))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); +PartitionMaxBytesStrategy partitionMaxBytesStrategy = mock(PartitionMaxBytesStrategy.class); +mockPartitionMaxBytes(partitionMaxBytesStrategy, Collections.singleton(tp1)); Review Comment: nit: I think we can do as below: ``` PartitionMaxBytesStrategy partitionMaxBytesStrategy = mockPartitionMaxBytes(Collections.singleton(tp1)); ... ... private PartitionMaxBytesStrategy mockPartitionMaxBytes(Set partitions) { ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -5977,6 +5980,64 @@ class ReplicaManagerTest { ) } + @Test + def testDelayedShareFetchPurgatoryOperationExpiration(): Unit = { +val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_))) +val rm = new ReplicaManager( + metrics = metrics, + config = config, + time = time, + scheduler = new MockScheduler(time), + logManager = mockLogMgr, + quotaManagers = quotaManager, + metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () => KRaftVersion.KRAFT_VERSION_0), + logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), + alterPartitionManager = alterPartitionManager) + +val groupId = "grp" +val tp1 = new TopicIdPartition(Uuid.randomUuid, new TopicPartition("foo1", 0)) +val partitionMaxBytes = new util.LinkedHashMap[TopicIdPartition, Integer] +partitionMaxBytes.put(tp1, 1000) + +val sp1 = mock(classOf[SharePartition]) +val sharePartitions = new util.LinkedHashMap[TopicIdPartition, SharePartition] +sharePartitions.put(tp1, sp1) + +val future = new CompletableFuture[util.Map[TopicIdPartition, ShareFetchResponseData.PartitionData]] +val shareFetch = new ShareFetch( + new FetchParams(ApiKeys.SHARE_FETCH.latestVersion, FetchRequest.ORDINARY_CONSUMER_ID, -1, 500, 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty, true), + groupId, + Uuid.randomUuid.toString, + future, + partitionMaxBytes, + 500, + 100, + brokerTopicStats) + +val delayedShareFetch = new DelayedShareFetch( + shareFetch, + rm, + mock(classOf[BiConsumer[SharePartitionKey, Throwable]]), + sharePartitions) Review Comment: Shouldn't we mock `DelayedShareFetch`? ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -5977,6 +5980,64 @@ class ReplicaManagerTest { ) } + @Test + def testDelayedShareFetchPurgatoryOperationExpiration(): Unit = { +val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_))) +val rm = new ReplicaManager( + metrics = metrics, + config = config, + time = time, + scheduler = new MockScheduler(time), + logManager = mockLogMgr, + quotaManagers = quotaManager, + metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () => KRaftVersion.KRAFT_VERSION_0), + logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), + alterPartitionManager = alterPartitionManager) + +val groupId = "grp" +val tp1 = new TopicIdPartition(Uuid.randomUuid, new TopicPartition("foo1", 0)) +val partitionMaxBytes = new util.LinkedHashMap[TopicIdPartition, Integer] +partitionMaxBytes.put(tp1, 1000) + +val sp1 = mock(classOf[SharePartition]) +val sharePartitions = new util.LinkedHashMap[TopicIdPartition, SharePartition] +sharePartitions.put(tp1, sp
Re: [PR] KAFKA-18653: Fix mocks and potential thread leak issues causing silent RejectedExecutionException in share group broker tests [kafka]
adixitconfluent commented on PR #18725: URL: https://github.com/apache/kafka/pull/18725#issuecomment-2620787370 Test failures are unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18653: Fix mocks and potential thread leak issues causing silent RejectedExecutionException in share group broker tests [kafka]
adixitconfluent commented on code in PR #18725: URL: https://github.com/apache/kafka/pull/18725#discussion_r1932602238 ## core/src/test/java/kafka/server/share/DelayedShareFetchTest.java: ## @@ -492,7 +492,7 @@ public void testForceCompleteTriggersDelayedActionsQueue() { DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( "TestShareFetch", mockTimer, replicaManager.localBrokerId(), -DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); +DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, false, true); Review Comment: I have added a test `ReplicaManagerTest.testDelayedShareFetchPurgatoryOperationExpiration` verifying the same. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18653: Fix mocks and potential thread leak issues causing silent RejectedExecutionException in share group broker tests [kafka]
apoorvmittal10 commented on code in PR #18725: URL: https://github.com/apache/kafka/pull/18725#discussion_r1931894291 ## core/src/test/java/kafka/server/share/DelayedShareFetchTest.java: ## @@ -492,7 +492,7 @@ public void testForceCompleteTriggersDelayedActionsQueue() { DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( "TestShareFetch", mockTimer, replicaManager.localBrokerId(), -DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); +DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, false, true); Review Comment: Hmm, so do we have tests either in ReplicaManager or elsewhere to test expirationReaper times out share fetch requests successfully? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18653: Fix mocks and potential thread leak issues causing silent RejectedExecutionException in share group broker tests [kafka]
adixitconfluent commented on code in PR #18725: URL: https://github.com/apache/kafka/pull/18725#discussion_r1931915386 ## core/src/test/java/kafka/server/share/DelayedShareFetchTest.java: ## @@ -704,6 +704,7 @@ public void testLocksReleasedForCompletedFetch() { .withShareFetchData(shareFetch) .withSharePartitions(sharePartitions1) .withReplicaManager(replicaManager) + .withPartitionMaxBytesStrategy(PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM)) Review Comment: We need this change whenever there is a call to `readFromLog` functionality in `DelayedShareFetch`. It was already present in the tests where this functionality was called indirectly/directly. Somehow, this one got missed earlier. We can do the mocking of response as well from `partitionMaxBytesStrategy.maxBytes`. It makes sense to me as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18653: Fix mocks and potential thread leak issues causing silent RejectedExecutionException in share group broker tests [kafka]
adixitconfluent commented on code in PR #18725: URL: https://github.com/apache/kafka/pull/18725#discussion_r1931915386 ## core/src/test/java/kafka/server/share/DelayedShareFetchTest.java: ## @@ -704,6 +704,7 @@ public void testLocksReleasedForCompletedFetch() { .withShareFetchData(shareFetch) .withSharePartitions(sharePartitions1) .withReplicaManager(replicaManager) + .withPartitionMaxBytesStrategy(PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM)) Review Comment: We need this change whenever there is a call to `readFromLog` functionality in `DelayedShareFetch`. It was already present in the tests where this functionality was called indirectly/directly. Somehow, this one got missed earlier. We can do the mocking of response as well from `partitionMaxBytesStrategy.maxBytes`. It makes sense to me as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18653: Fix mocks and potential thread leak issues causing silent RejectedExecutionException in share group broker tests [kafka]
apoorvmittal10 commented on code in PR #18725: URL: https://github.com/apache/kafka/pull/18725#discussion_r1931897412 ## core/src/test/java/kafka/server/share/DelayedShareFetchTest.java: ## @@ -704,6 +704,7 @@ public void testLocksReleasedForCompletedFetch() { .withShareFetchData(shareFetch) .withSharePartitions(sharePartitions1) .withReplicaManager(replicaManager) + .withPartitionMaxBytesStrategy(PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM)) Review Comment: Why the change only over here and not in other methods? Also should we mock the response from `PartitionMaxBytesStrategy`? ## core/src/test/java/kafka/server/share/DelayedShareFetchTest.java: ## @@ -492,7 +492,7 @@ public void testForceCompleteTriggersDelayedActionsQueue() { DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( "TestShareFetch", mockTimer, replicaManager.localBrokerId(), -DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); +DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, false, true); Review Comment: Hmm, so do we have tests either in ReplicaManager or elsewhere to expirationReaper timed out for DelayShareFetch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org