Re: [PR] KAFKA-18653: Fix mocks and potential thread leak issues causing silent RejectedExecutionException in share group broker tests [kafka]

2025-02-10 Thread via GitHub


apoorvmittal10 commented on code in PR #18725:
URL: https://github.com/apache/kafka/pull/18725#discussion_r1949839653


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -5977,6 +5980,67 @@ class ReplicaManagerTest {
 )
   }
 
+  @Test
+  def testDelayedShareFetchPurgatoryOperationExpiration(): Unit = {
+val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new 
File(_)))
+val rm = new ReplicaManager(

Review Comment:
   Apologies, I missed it in review. Should have caught it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18653: Fix mocks and potential thread leak issues causing silent RejectedExecutionException in share group broker tests [kafka]

2025-02-10 Thread via GitHub


apoorvmittal10 commented on code in PR #18725:
URL: https://github.com/apache/kafka/pull/18725#discussion_r1949830538


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -5977,6 +5980,67 @@ class ReplicaManagerTest {
 )
   }
 
+  @Test
+  def testDelayedShareFetchPurgatoryOperationExpiration(): Unit = {
+val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new 
File(_)))
+val rm = new ReplicaManager(

Review Comment:
   Thanks @chia7712 , make sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18653: Fix mocks and potential thread leak issues causing silent RejectedExecutionException in share group broker tests [kafka]

2025-02-10 Thread via GitHub


chia7712 commented on code in PR #18725:
URL: https://github.com/apache/kafka/pull/18725#discussion_r1949798227


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -5977,6 +5980,67 @@ class ReplicaManagerTest {
 )
   }
 
+  @Test
+  def testDelayedShareFetchPurgatoryOperationExpiration(): Unit = {
+val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new 
File(_)))
+val rm = new ReplicaManager(

Review Comment:
   As a reminder, it's crucial to utilize a `try-finally` block to ensure 
proper closure of the `ReplicaManager`. Failing to do so can result in an 
unreleased thread from the purgatory, potentially leading to errors in 
subsequent integration tests that incorporate thread leak detection.
   
   ```
   org.opentest4j.AssertionFailedError: Found 1 unexpected threads during 
@BeforeAll: executor-ShareFetch ==> expected:  but was:  
   ```
   Also, the error can be reproduced by following command.
   ```
   ./gradlew core:test --tests ReplicaManagerTest --tests 
SaslApiVersionsRequestTest --tests LeaderEpochIntegrationTest --tests 
RequestQuotaTest -PmaxParallelForks=1
   ```
   
   I will fix it in https://issues.apache.org/jira/browse/KAFKA-18770



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18653: Fix mocks and potential thread leak issues causing silent RejectedExecutionException in share group broker tests [kafka]

2025-01-29 Thread via GitHub


AndrewJSchofield merged PR #18725:
URL: https://github.com/apache/kafka/pull/18725


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18653: Fix mocks and potential thread leak issues causing silent RejectedExecutionException in share group broker tests [kafka]

2025-01-29 Thread via GitHub


adixitconfluent commented on code in PR #18725:
URL: https://github.com/apache/kafka/pull/18725#discussion_r1933934456


##
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##
@@ -68,7 +68,14 @@ public class DelayedShareFetch extends DelayedOperation {
 private LinkedHashMap partitionsAcquired;
 private LinkedHashMap 
partitionsAlreadyFetched;
 
-DelayedShareFetch(
+/**
+ * This function returns an instance of delayed share fetch operation for 
completing share fetch requests instantaneously or with delay.

Review Comment:
   done



##
core/src/main/java/kafka/server/share/SharePartition.java:
##
@@ -1106,8 +1106,9 @@ boolean canAcquireRecords() {
  * the records are fetched and acquired.
  *
  * @return A boolean which indicates whether the fetch lock is acquired.
+ * Visible for testing.

Review Comment:
   I have removed "Visible for testing"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18653: Fix mocks and potential thread leak issues causing silent RejectedExecutionException in share group broker tests [kafka]

2025-01-29 Thread via GitHub


AndrewJSchofield commented on code in PR #18725:
URL: https://github.com/apache/kafka/pull/18725#discussion_r1933895394


##
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##
@@ -68,7 +68,14 @@ public class DelayedShareFetch extends DelayedOperation {
 private LinkedHashMap partitionsAcquired;
 private LinkedHashMap 
partitionsAlreadyFetched;
 
-DelayedShareFetch(
+/**
+ * This function returns an instance of delayed share fetch operation for 
completing share fetch requests instantaneously or with delay.

Review Comment:
   Constructs rather than returns I think.



##
core/src/main/java/kafka/server/share/SharePartition.java:
##
@@ -1106,8 +1106,9 @@ boolean canAcquireRecords() {
  * the records are fetched and acquired.
  *
  * @return A boolean which indicates whether the fetch lock is acquired.
+ * Visible for testing.

Review Comment:
   If you look at the javadoc, this sentence is part of the description for the 
return value. Either remove "Visible for testing" (preferred) or make sure it 
fits in the javadoc more nicely.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18653: Fix mocks and potential thread leak issues causing silent RejectedExecutionException in share group broker tests [kafka]

2025-01-29 Thread via GitHub


adixitconfluent commented on code in PR #18725:
URL: https://github.com/apache/kafka/pull/18725#discussion_r1933704511


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -5977,6 +5980,64 @@ class ReplicaManagerTest {
 )
   }
 
+  @Test
+  def testDelayedShareFetchPurgatoryOperationExpiration(): Unit = {
+val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new 
File(_)))
+val rm = new ReplicaManager(
+  metrics = metrics,
+  config = config,
+  time = time,
+  scheduler = new MockScheduler(time),
+  logManager = mockLogMgr,
+  quotaManagers = quotaManager,
+  metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () => 
KRaftVersion.KRAFT_VERSION_0),
+  logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
+  alterPartitionManager = alterPartitionManager)
+
+val groupId = "grp"
+val tp1 = new TopicIdPartition(Uuid.randomUuid, new TopicPartition("foo1", 
0))
+val partitionMaxBytes = new util.LinkedHashMap[TopicIdPartition, Integer]
+partitionMaxBytes.put(tp1, 1000)
+
+val sp1 = mock(classOf[SharePartition])
+val sharePartitions = new util.LinkedHashMap[TopicIdPartition, 
SharePartition]
+sharePartitions.put(tp1, sp1)
+
+val future = new CompletableFuture[util.Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData]]
+val shareFetch = new ShareFetch(
+  new FetchParams(ApiKeys.SHARE_FETCH.latestVersion, 
FetchRequest.ORDINARY_CONSUMER_ID, -1, 500, 1, 1024 * 1024, 
FetchIsolation.HIGH_WATERMARK, Optional.empty, true),
+  groupId,
+  Uuid.randomUuid.toString,
+  future,
+  partitionMaxBytes,
+  500,
+  100,
+  brokerTopicStats)
+
+val delayedShareFetch = new DelayedShareFetch(
+  shareFetch,
+  rm,
+  mock(classOf[BiConsumer[SharePartitionKey, Throwable]]),
+  sharePartitions)

Review Comment:
   actually we should check for number of method calls for `forceComplete` 
since `onComplete` is more user logic based and can cause problems in the 
future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18653: Fix mocks and potential thread leak issues causing silent RejectedExecutionException in share group broker tests [kafka]

2025-01-29 Thread via GitHub


adixitconfluent commented on code in PR #18725:
URL: https://github.com/apache/kafka/pull/18725#discussion_r1933651209


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -5977,6 +5980,64 @@ class ReplicaManagerTest {
 )
   }
 
+  @Test
+  def testDelayedShareFetchPurgatoryOperationExpiration(): Unit = {
+val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new 
File(_)))
+val rm = new ReplicaManager(
+  metrics = metrics,
+  config = config,
+  time = time,
+  scheduler = new MockScheduler(time),
+  logManager = mockLogMgr,
+  quotaManagers = quotaManager,
+  metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () => 
KRaftVersion.KRAFT_VERSION_0),
+  logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
+  alterPartitionManager = alterPartitionManager)
+
+val groupId = "grp"
+val tp1 = new TopicIdPartition(Uuid.randomUuid, new TopicPartition("foo1", 
0))
+val partitionMaxBytes = new util.LinkedHashMap[TopicIdPartition, Integer]
+partitionMaxBytes.put(tp1, 1000)

Review Comment:
   actually we require partitionMaxBytes of type `util.LinkedHashMap`. Hence, 
we cannot use `mkMap`, I have the failing 
[build](https://github.com/apache/kafka/actions/runs/13029166140/job/36344429628?pr=18725)
 in `core:compileTestScala` if we use `mkMap`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18653: Fix mocks and potential thread leak issues causing silent RejectedExecutionException in share group broker tests [kafka]

2025-01-29 Thread via GitHub


adixitconfluent commented on code in PR #18725:
URL: https://github.com/apache/kafka/pull/18725#discussion_r1933639965


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -5977,6 +5980,64 @@ class ReplicaManagerTest {
 )
   }
 
+  @Test
+  def testDelayedShareFetchPurgatoryOperationExpiration(): Unit = {
+val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new 
File(_)))
+val rm = new ReplicaManager(
+  metrics = metrics,
+  config = config,
+  time = time,
+  scheduler = new MockScheduler(time),
+  logManager = mockLogMgr,
+  quotaManagers = quotaManager,
+  metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () => 
KRaftVersion.KRAFT_VERSION_0),
+  logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
+  alterPartitionManager = alterPartitionManager)
+
+val groupId = "grp"
+val tp1 = new TopicIdPartition(Uuid.randomUuid, new TopicPartition("foo1", 
0))
+val partitionMaxBytes = new util.LinkedHashMap[TopicIdPartition, Integer]
+partitionMaxBytes.put(tp1, 1000)
+
+val sp1 = mock(classOf[SharePartition])
+val sharePartitions = new util.LinkedHashMap[TopicIdPartition, 
SharePartition]
+sharePartitions.put(tp1, sp1)
+
+val future = new CompletableFuture[util.Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData]]
+val shareFetch = new ShareFetch(
+  new FetchParams(ApiKeys.SHARE_FETCH.latestVersion, 
FetchRequest.ORDINARY_CONSUMER_ID, -1, 500, 1, 1024 * 1024, 
FetchIsolation.HIGH_WATERMARK, Optional.empty, true),
+  groupId,
+  Uuid.randomUuid.toString,
+  future,
+  partitionMaxBytes,
+  500,
+  100,
+  brokerTopicStats)
+
+val delayedShareFetch = new DelayedShareFetch(
+  shareFetch,
+  rm,
+  mock(classOf[BiConsumer[SharePartitionKey, Throwable]]),
+  sharePartitions)

Review Comment:
   ideally,` assertFalse(future.isDone)` should be enough, but I've added a 
check for number of method calls for `onComplete`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18653: Fix mocks and potential thread leak issues causing silent RejectedExecutionException in share group broker tests [kafka]

2025-01-29 Thread via GitHub


adixitconfluent commented on code in PR #18725:
URL: https://github.com/apache/kafka/pull/18725#discussion_r1933626593


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -5977,6 +5980,64 @@ class ReplicaManagerTest {
 )
   }
 
+  @Test
+  def testDelayedShareFetchPurgatoryOperationExpiration(): Unit = {
+val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new 
File(_)))
+val rm = new ReplicaManager(
+  metrics = metrics,
+  config = config,
+  time = time,
+  scheduler = new MockScheduler(time),
+  logManager = mockLogMgr,
+  quotaManagers = quotaManager,
+  metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () => 
KRaftVersion.KRAFT_VERSION_0),
+  logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
+  alterPartitionManager = alterPartitionManager)
+
+val groupId = "grp"
+val tp1 = new TopicIdPartition(Uuid.randomUuid, new TopicPartition("foo1", 
0))
+val partitionMaxBytes = new util.LinkedHashMap[TopicIdPartition, Integer]
+partitionMaxBytes.put(tp1, 1000)
+
+val sp1 = mock(classOf[SharePartition])
+val sharePartitions = new util.LinkedHashMap[TopicIdPartition, 
SharePartition]
+sharePartitions.put(tp1, sp1)
+
+val future = new CompletableFuture[util.Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData]]
+val shareFetch = new ShareFetch(
+  new FetchParams(ApiKeys.SHARE_FETCH.latestVersion, 
FetchRequest.ORDINARY_CONSUMER_ID, -1, 500, 1, 1024 * 1024, 
FetchIsolation.HIGH_WATERMARK, Optional.empty, true),
+  groupId,
+  Uuid.randomUuid.toString,
+  future,
+  partitionMaxBytes,
+  500,
+  100,
+  brokerTopicStats)
+
+val delayedShareFetch = new DelayedShareFetch(
+  shareFetch,
+  rm,
+  mock(classOf[BiConsumer[SharePartitionKey, Throwable]]),
+  sharePartitions)
+
+val delayedShareFetchWatchKeys : util.List[DelayedShareFetchKey] = new 
util.ArrayList[DelayedShareFetchKey]
+partitionMaxBytes.keySet.forEach((topicIdPartition: TopicIdPartition) => 
delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId, 
topicIdPartition.topicId, topicIdPartition.partition)))
+
+// You cannot acquire records for sp1, so request will be stored in 
purgatory waiting for timeout.
+when(sp1.maybeAcquireFetchLock).thenReturn(false)
+
+rm.addDelayedShareFetchRequest(delayedShareFetch = delayedShareFetch, 
delayedShareFetchKeys = delayedShareFetchWatchKeys)
+assertEquals(1, rm.delayedShareFetchPurgatory.watched)
+
+// Future is not complete initially.
+assertFalse(future.isDone)
+// Post timeout, share fetch request will timeout and the future should 
complete.
+waitUntilTrue(() => future.isDone, "Processing in delayed share fetch 
purgatory never ended.", 1000)

Review Comment:
   yes, added a comment around the same. In this test case, the timeout is set 
at 500ms but I've kept a buffer of additional 500ms so the task can always 
timeout. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18653: Fix mocks and potential thread leak issues causing silent RejectedExecutionException in share group broker tests [kafka]

2025-01-29 Thread via GitHub


apoorvmittal10 commented on code in PR #18725:
URL: https://github.com/apache/kafka/pull/18725#discussion_r1933620080


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -5977,6 +5980,64 @@ class ReplicaManagerTest {
 )
   }
 
+  @Test
+  def testDelayedShareFetchPurgatoryOperationExpiration(): Unit = {
+val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new 
File(_)))
+val rm = new ReplicaManager(
+  metrics = metrics,
+  config = config,
+  time = time,
+  scheduler = new MockScheduler(time),
+  logManager = mockLogMgr,
+  quotaManagers = quotaManager,
+  metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () => 
KRaftVersion.KRAFT_VERSION_0),
+  logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
+  alterPartitionManager = alterPartitionManager)
+
+val groupId = "grp"
+val tp1 = new TopicIdPartition(Uuid.randomUuid, new TopicPartition("foo1", 
0))
+val partitionMaxBytes = new util.LinkedHashMap[TopicIdPartition, Integer]
+partitionMaxBytes.put(tp1, 1000)
+
+val sp1 = mock(classOf[SharePartition])
+val sharePartitions = new util.LinkedHashMap[TopicIdPartition, 
SharePartition]
+sharePartitions.put(tp1, sp1)
+
+val future = new CompletableFuture[util.Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData]]
+val shareFetch = new ShareFetch(
+  new FetchParams(ApiKeys.SHARE_FETCH.latestVersion, 
FetchRequest.ORDINARY_CONSUMER_ID, -1, 500, 1, 1024 * 1024, 
FetchIsolation.HIGH_WATERMARK, Optional.empty, true),
+  groupId,
+  Uuid.randomUuid.toString,
+  future,
+  partitionMaxBytes,
+  500,
+  100,
+  brokerTopicStats)
+
+val delayedShareFetch = new DelayedShareFetch(
+  shareFetch,
+  rm,
+  mock(classOf[BiConsumer[SharePartitionKey, Throwable]]),
+  sharePartitions)

Review Comment:
   Hmmm, sure but how do we guarantee that the request was timeout and not 
copleted in normal flow. By checking `assertFalse(future.isDone)` initially? Or 
should we also check the method call which should only happen when timeout gets 
triggered i.e. `forceComplete`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18653: Fix mocks and potential thread leak issues causing silent RejectedExecutionException in share group broker tests [kafka]

2025-01-29 Thread via GitHub


apoorvmittal10 commented on code in PR #18725:
URL: https://github.com/apache/kafka/pull/18725#discussion_r1933609359


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -5977,6 +5980,64 @@ class ReplicaManagerTest {
 )
   }
 
+  @Test
+  def testDelayedShareFetchPurgatoryOperationExpiration(): Unit = {
+val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new 
File(_)))
+val rm = new ReplicaManager(
+  metrics = metrics,
+  config = config,
+  time = time,
+  scheduler = new MockScheduler(time),
+  logManager = mockLogMgr,
+  quotaManagers = quotaManager,
+  metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () => 
KRaftVersion.KRAFT_VERSION_0),
+  logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
+  alterPartitionManager = alterPartitionManager)
+
+val groupId = "grp"
+val tp1 = new TopicIdPartition(Uuid.randomUuid, new TopicPartition("foo1", 
0))
+val partitionMaxBytes = new util.LinkedHashMap[TopicIdPartition, Integer]
+partitionMaxBytes.put(tp1, 1000)
+
+val sp1 = mock(classOf[SharePartition])
+val sharePartitions = new util.LinkedHashMap[TopicIdPartition, 
SharePartition]
+sharePartitions.put(tp1, sp1)
+
+val future = new CompletableFuture[util.Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData]]
+val shareFetch = new ShareFetch(
+  new FetchParams(ApiKeys.SHARE_FETCH.latestVersion, 
FetchRequest.ORDINARY_CONSUMER_ID, -1, 500, 1, 1024 * 1024, 
FetchIsolation.HIGH_WATERMARK, Optional.empty, true),
+  groupId,
+  Uuid.randomUuid.toString,
+  future,
+  partitionMaxBytes,
+  500,
+  100,
+  brokerTopicStats)
+
+val delayedShareFetch = new DelayedShareFetch(
+  shareFetch,
+  rm,
+  mock(classOf[BiConsumer[SharePartitionKey, Throwable]]),
+  sharePartitions)
+
+val delayedShareFetchWatchKeys : util.List[DelayedShareFetchKey] = new 
util.ArrayList[DelayedShareFetchKey]
+partitionMaxBytes.keySet.forEach((topicIdPartition: TopicIdPartition) => 
delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId, 
topicIdPartition.topicId, topicIdPartition.partition)))
+
+// You cannot acquire records for sp1, so request will be stored in 
purgatory waiting for timeout.
+when(sp1.maybeAcquireFetchLock).thenReturn(false)
+
+rm.addDelayedShareFetchRequest(delayedShareFetch = delayedShareFetch, 
delayedShareFetchKeys = delayedShareFetchWatchKeys)
+assertEquals(1, rm.delayedShareFetchPurgatory.watched)
+
+// Future is not complete initially.
+assertFalse(future.isDone)
+// Post timeout, share fetch request will timeout and the future should 
complete.
+waitUntilTrue(() => future.isDone, "Processing in delayed share fetch 
purgatory never ended.", 1000)

Review Comment:
   Shall we have a bit buffer say, 1200 and write in comments that 1000 is 
default timeout but kept a buffer of 200 so the task can alwawys timeout?



##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -5977,6 +5980,64 @@ class ReplicaManagerTest {
 )
   }
 
+  @Test
+  def testDelayedShareFetchPurgatoryOperationExpiration(): Unit = {
+val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new 
File(_)))
+val rm = new ReplicaManager(
+  metrics = metrics,
+  config = config,
+  time = time,
+  scheduler = new MockScheduler(time),
+  logManager = mockLogMgr,
+  quotaManagers = quotaManager,
+  metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () => 
KRaftVersion.KRAFT_VERSION_0),
+  logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
+  alterPartitionManager = alterPartitionManager)
+
+val groupId = "grp"
+val tp1 = new TopicIdPartition(Uuid.randomUuid, new TopicPartition("foo1", 
0))
+val partitionMaxBytes = new util.LinkedHashMap[TopicIdPartition, Integer]
+partitionMaxBytes.put(tp1, 1000)
+
+val sp1 = mock(classOf[SharePartition])
+val sharePartitions = new util.LinkedHashMap[TopicIdPartition, 
SharePartition]
+sharePartitions.put(tp1, sp1)
+
+val future = new CompletableFuture[util.Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData]]
+val shareFetch = new ShareFetch(
+  new FetchParams(ApiKeys.SHARE_FETCH.latestVersion, 
FetchRequest.ORDINARY_CONSUMER_ID, -1, 500, 1, 1024 * 1024, 
FetchIsolation.HIGH_WATERMARK, Optional.empty, true),
+  groupId,
+  Uuid.randomUuid.toString,
+  future,
+  partitionMaxBytes,
+  500,
+  100,
+  brokerTopicStats)
+
+val delayedShareFetch = new DelayedShareFetch(
+  shareFetch,
+  rm,
+  mock(classOf[BiConsumer[SharePartitionKey, Throwable]]),
+  sharePartitions)

Review Comment:
   Hmmm, sure but how do we guarantee that the request was timeout and not 
copleted in normal flow. By checking `assertFalse(future.isDone)` initial

Re: [PR] KAFKA-18653: Fix mocks and potential thread leak issues causing silent RejectedExecutionException in share group broker tests [kafka]

2025-01-29 Thread via GitHub


adixitconfluent commented on code in PR #18725:
URL: https://github.com/apache/kafka/pull/18725#discussion_r1933573691


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -5977,6 +5980,64 @@ class ReplicaManagerTest {
 )
   }
 
+  @Test
+  def testDelayedShareFetchPurgatoryOperationExpiration(): Unit = {
+val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new 
File(_)))
+val rm = new ReplicaManager(
+  metrics = metrics,
+  config = config,
+  time = time,
+  scheduler = new MockScheduler(time),
+  logManager = mockLogMgr,
+  quotaManagers = quotaManager,
+  metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () => 
KRaftVersion.KRAFT_VERSION_0),
+  logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
+  alterPartitionManager = alterPartitionManager)
+
+val groupId = "grp"
+val tp1 = new TopicIdPartition(Uuid.randomUuid, new TopicPartition("foo1", 
0))
+val partitionMaxBytes = new util.LinkedHashMap[TopicIdPartition, Integer]
+partitionMaxBytes.put(tp1, 1000)
+
+val sp1 = mock(classOf[SharePartition])
+val sharePartitions = new util.LinkedHashMap[TopicIdPartition, 
SharePartition]
+sharePartitions.put(tp1, sp1)
+
+val future = new CompletableFuture[util.Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData]]
+val shareFetch = new ShareFetch(
+  new FetchParams(ApiKeys.SHARE_FETCH.latestVersion, 
FetchRequest.ORDINARY_CONSUMER_ID, -1, 500, 1, 1024 * 1024, 
FetchIsolation.HIGH_WATERMARK, Optional.empty, true),
+  groupId,
+  Uuid.randomUuid.toString,
+  future,
+  partitionMaxBytes,
+  500,
+  100,
+  brokerTopicStats)
+
+val delayedShareFetch = new DelayedShareFetch(
+  shareFetch,
+  rm,
+  mock(classOf[BiConsumer[SharePartitionKey, Throwable]]),
+  sharePartitions)

Review Comment:
   I think the relevant attributes of `DelayedShareFetch` have already been 
mocked, for example - exception handler. If we don't do this, we'd have to 
write a bunch of `when` statements for `delayedShareFetch`. Hence, I prefer an 
instance instead of a mock. We are not playing with the behaviour of 
`DelayedShareFetch` and we need `shareFetch`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18653: Fix mocks and potential thread leak issues causing silent RejectedExecutionException in share group broker tests [kafka]

2025-01-29 Thread via GitHub


adixitconfluent commented on code in PR #18725:
URL: https://github.com/apache/kafka/pull/18725#discussion_r1933573691


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -5977,6 +5980,64 @@ class ReplicaManagerTest {
 )
   }
 
+  @Test
+  def testDelayedShareFetchPurgatoryOperationExpiration(): Unit = {
+val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new 
File(_)))
+val rm = new ReplicaManager(
+  metrics = metrics,
+  config = config,
+  time = time,
+  scheduler = new MockScheduler(time),
+  logManager = mockLogMgr,
+  quotaManagers = quotaManager,
+  metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () => 
KRaftVersion.KRAFT_VERSION_0),
+  logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
+  alterPartitionManager = alterPartitionManager)
+
+val groupId = "grp"
+val tp1 = new TopicIdPartition(Uuid.randomUuid, new TopicPartition("foo1", 
0))
+val partitionMaxBytes = new util.LinkedHashMap[TopicIdPartition, Integer]
+partitionMaxBytes.put(tp1, 1000)
+
+val sp1 = mock(classOf[SharePartition])
+val sharePartitions = new util.LinkedHashMap[TopicIdPartition, 
SharePartition]
+sharePartitions.put(tp1, sp1)
+
+val future = new CompletableFuture[util.Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData]]
+val shareFetch = new ShareFetch(
+  new FetchParams(ApiKeys.SHARE_FETCH.latestVersion, 
FetchRequest.ORDINARY_CONSUMER_ID, -1, 500, 1, 1024 * 1024, 
FetchIsolation.HIGH_WATERMARK, Optional.empty, true),
+  groupId,
+  Uuid.randomUuid.toString,
+  future,
+  partitionMaxBytes,
+  500,
+  100,
+  brokerTopicStats)
+
+val delayedShareFetch = new DelayedShareFetch(
+  shareFetch,
+  rm,
+  mock(classOf[BiConsumer[SharePartitionKey, Throwable]]),
+  sharePartitions)

Review Comment:
   I think the relevant attributes of `DelayedShareFetch` have already been 
mocked, for example - exception handler. If we don't do this, we'd have to 
write a bunch of `when` statements for `delayedShareFetch`. Hence, I prefer an 
instance instead of a mock. We are not playing with the behaviour of 
`DelayedShareFetch` anyways. 



##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -5977,6 +5980,64 @@ class ReplicaManagerTest {
 )
   }
 
+  @Test
+  def testDelayedShareFetchPurgatoryOperationExpiration(): Unit = {
+val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new 
File(_)))
+val rm = new ReplicaManager(
+  metrics = metrics,
+  config = config,
+  time = time,
+  scheduler = new MockScheduler(time),
+  logManager = mockLogMgr,
+  quotaManagers = quotaManager,
+  metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () => 
KRaftVersion.KRAFT_VERSION_0),
+  logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
+  alterPartitionManager = alterPartitionManager)
+
+val groupId = "grp"
+val tp1 = new TopicIdPartition(Uuid.randomUuid, new TopicPartition("foo1", 
0))
+val partitionMaxBytes = new util.LinkedHashMap[TopicIdPartition, Integer]
+partitionMaxBytes.put(tp1, 1000)
+
+val sp1 = mock(classOf[SharePartition])
+val sharePartitions = new util.LinkedHashMap[TopicIdPartition, 
SharePartition]
+sharePartitions.put(tp1, sp1)
+
+val future = new CompletableFuture[util.Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData]]
+val shareFetch = new ShareFetch(
+  new FetchParams(ApiKeys.SHARE_FETCH.latestVersion, 
FetchRequest.ORDINARY_CONSUMER_ID, -1, 500, 1, 1024 * 1024, 
FetchIsolation.HIGH_WATERMARK, Optional.empty, true),
+  groupId,
+  Uuid.randomUuid.toString,
+  future,
+  partitionMaxBytes,
+  500,
+  100,
+  brokerTopicStats)

Review Comment:
   Again, we need attributes of ShareFetch here like `FetchParams.maxWaitMs`, 
so I don't think mocking it is the right approach here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18653: Fix mocks and potential thread leak issues causing silent RejectedExecutionException in share group broker tests [kafka]

2025-01-29 Thread via GitHub


apoorvmittal10 commented on code in PR #18725:
URL: https://github.com/apache/kafka/pull/18725#discussion_r1933545503


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -5977,6 +5980,64 @@ class ReplicaManagerTest {
 )
   }
 
+  @Test
+  def testDelayedShareFetchPurgatoryOperationExpiration(): Unit = {
+val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new 
File(_)))
+val rm = new ReplicaManager(
+  metrics = metrics,
+  config = config,
+  time = time,
+  scheduler = new MockScheduler(time),
+  logManager = mockLogMgr,
+  quotaManagers = quotaManager,
+  metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () => 
KRaftVersion.KRAFT_VERSION_0),
+  logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
+  alterPartitionManager = alterPartitionManager)
+
+val groupId = "grp"
+val tp1 = new TopicIdPartition(Uuid.randomUuid, new TopicPartition("foo1", 
0))
+val partitionMaxBytes = new util.LinkedHashMap[TopicIdPartition, Integer]
+partitionMaxBytes.put(tp1, 1000)

Review Comment:
   Can we use mkMap from Utils?



##
core/src/test/java/kafka/server/share/DelayedShareFetchTest.java:
##
@@ -522,6 +532,9 @@ public void testForceCompleteTriggersDelayedActionsQueue() {
 
 doAnswer(invocation -> 
buildLogReadResult(Collections.singleton(tp1))).when(replicaManager).readFromLog(any(),
 any(), any(ReplicaQuota.class), anyBoolean());
 
+PartitionMaxBytesStrategy partitionMaxBytesStrategy = 
mock(PartitionMaxBytesStrategy.class);
+mockPartitionMaxBytes(partitionMaxBytesStrategy, 
Collections.singleton(tp1));

Review Comment:
   nit: I think we can do as below: 
   
   ```
   PartitionMaxBytesStrategy partitionMaxBytesStrategy = 
mockPartitionMaxBytes(Collections.singleton(tp1));
   ...
   ...
private PartitionMaxBytesStrategy 
mockPartitionMaxBytes(Set partitions) {



##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -5977,6 +5980,64 @@ class ReplicaManagerTest {
 )
   }
 
+  @Test
+  def testDelayedShareFetchPurgatoryOperationExpiration(): Unit = {
+val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new 
File(_)))
+val rm = new ReplicaManager(
+  metrics = metrics,
+  config = config,
+  time = time,
+  scheduler = new MockScheduler(time),
+  logManager = mockLogMgr,
+  quotaManagers = quotaManager,
+  metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () => 
KRaftVersion.KRAFT_VERSION_0),
+  logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
+  alterPartitionManager = alterPartitionManager)
+
+val groupId = "grp"
+val tp1 = new TopicIdPartition(Uuid.randomUuid, new TopicPartition("foo1", 
0))
+val partitionMaxBytes = new util.LinkedHashMap[TopicIdPartition, Integer]
+partitionMaxBytes.put(tp1, 1000)
+
+val sp1 = mock(classOf[SharePartition])
+val sharePartitions = new util.LinkedHashMap[TopicIdPartition, 
SharePartition]
+sharePartitions.put(tp1, sp1)
+
+val future = new CompletableFuture[util.Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData]]
+val shareFetch = new ShareFetch(
+  new FetchParams(ApiKeys.SHARE_FETCH.latestVersion, 
FetchRequest.ORDINARY_CONSUMER_ID, -1, 500, 1, 1024 * 1024, 
FetchIsolation.HIGH_WATERMARK, Optional.empty, true),
+  groupId,
+  Uuid.randomUuid.toString,
+  future,
+  partitionMaxBytes,
+  500,
+  100,
+  brokerTopicStats)
+
+val delayedShareFetch = new DelayedShareFetch(
+  shareFetch,
+  rm,
+  mock(classOf[BiConsumer[SharePartitionKey, Throwable]]),
+  sharePartitions)

Review Comment:
   Shouldn't we mock `DelayedShareFetch`?



##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -5977,6 +5980,64 @@ class ReplicaManagerTest {
 )
   }
 
+  @Test
+  def testDelayedShareFetchPurgatoryOperationExpiration(): Unit = {
+val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new 
File(_)))
+val rm = new ReplicaManager(
+  metrics = metrics,
+  config = config,
+  time = time,
+  scheduler = new MockScheduler(time),
+  logManager = mockLogMgr,
+  quotaManagers = quotaManager,
+  metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () => 
KRaftVersion.KRAFT_VERSION_0),
+  logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
+  alterPartitionManager = alterPartitionManager)
+
+val groupId = "grp"
+val tp1 = new TopicIdPartition(Uuid.randomUuid, new TopicPartition("foo1", 
0))
+val partitionMaxBytes = new util.LinkedHashMap[TopicIdPartition, Integer]
+partitionMaxBytes.put(tp1, 1000)
+
+val sp1 = mock(classOf[SharePartition])
+val sharePartitions = new util.LinkedHashMap[TopicIdPartition, 
SharePartition]
+sharePartitions.put(tp1, sp

Re: [PR] KAFKA-18653: Fix mocks and potential thread leak issues causing silent RejectedExecutionException in share group broker tests [kafka]

2025-01-28 Thread via GitHub


adixitconfluent commented on PR #18725:
URL: https://github.com/apache/kafka/pull/18725#issuecomment-2620787370

   Test failures are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18653: Fix mocks and potential thread leak issues causing silent RejectedExecutionException in share group broker tests [kafka]

2025-01-28 Thread via GitHub


adixitconfluent commented on code in PR #18725:
URL: https://github.com/apache/kafka/pull/18725#discussion_r1932602238


##
core/src/test/java/kafka/server/share/DelayedShareFetchTest.java:
##
@@ -492,7 +492,7 @@ public void testForceCompleteTriggersDelayedActionsQueue() {
 
 DelayedOperationPurgatory 
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
 "TestShareFetch", mockTimer, replicaManager.localBrokerId(),
-DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
+DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, false, true);

Review Comment:
   I have added a test 
`ReplicaManagerTest.testDelayedShareFetchPurgatoryOperationExpiration` 
verifying the same.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18653: Fix mocks and potential thread leak issues causing silent RejectedExecutionException in share group broker tests [kafka]

2025-01-28 Thread via GitHub


apoorvmittal10 commented on code in PR #18725:
URL: https://github.com/apache/kafka/pull/18725#discussion_r1931894291


##
core/src/test/java/kafka/server/share/DelayedShareFetchTest.java:
##
@@ -492,7 +492,7 @@ public void testForceCompleteTriggersDelayedActionsQueue() {
 
 DelayedOperationPurgatory 
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
 "TestShareFetch", mockTimer, replicaManager.localBrokerId(),
-DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
+DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, false, true);

Review Comment:
   Hmm, so do we have tests either in ReplicaManager or elsewhere to test 
expirationReaper times out share fetch requests successfully?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18653: Fix mocks and potential thread leak issues causing silent RejectedExecutionException in share group broker tests [kafka]

2025-01-28 Thread via GitHub


adixitconfluent commented on code in PR #18725:
URL: https://github.com/apache/kafka/pull/18725#discussion_r1931915386


##
core/src/test/java/kafka/server/share/DelayedShareFetchTest.java:
##
@@ -704,6 +704,7 @@ public void testLocksReleasedForCompletedFetch() {
 .withShareFetchData(shareFetch)
 .withSharePartitions(sharePartitions1)
 .withReplicaManager(replicaManager)
+
.withPartitionMaxBytesStrategy(PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM))

Review Comment:
   We need this change whenever there is a call to `readFromLog` functionality 
in `DelayedShareFetch`. It was already present in the tests where this 
functionality was called indirectly/directly. Somehow, this one got missed 
earlier.
   
   We can do the mocking of response as well from 
`partitionMaxBytesStrategy.maxBytes`. It makes sense to me as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18653: Fix mocks and potential thread leak issues causing silent RejectedExecutionException in share group broker tests [kafka]

2025-01-28 Thread via GitHub


adixitconfluent commented on code in PR #18725:
URL: https://github.com/apache/kafka/pull/18725#discussion_r1931915386


##
core/src/test/java/kafka/server/share/DelayedShareFetchTest.java:
##
@@ -704,6 +704,7 @@ public void testLocksReleasedForCompletedFetch() {
 .withShareFetchData(shareFetch)
 .withSharePartitions(sharePartitions1)
 .withReplicaManager(replicaManager)
+
.withPartitionMaxBytesStrategy(PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM))

Review Comment:
   We need this change whenever there is a call to `readFromLog` functionality 
in `DelayedShareFetch`. It was already present in the tests where this 
functionality was called indirectly/directly. Somehow, this one got missed 
earlier.
   We can do the mocking of response as well from 
`partitionMaxBytesStrategy.maxBytes`. It makes sense to me as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18653: Fix mocks and potential thread leak issues causing silent RejectedExecutionException in share group broker tests [kafka]

2025-01-28 Thread via GitHub


apoorvmittal10 commented on code in PR #18725:
URL: https://github.com/apache/kafka/pull/18725#discussion_r1931897412


##
core/src/test/java/kafka/server/share/DelayedShareFetchTest.java:
##
@@ -704,6 +704,7 @@ public void testLocksReleasedForCompletedFetch() {
 .withShareFetchData(shareFetch)
 .withSharePartitions(sharePartitions1)
 .withReplicaManager(replicaManager)
+
.withPartitionMaxBytesStrategy(PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM))

Review Comment:
   Why the change only over here and not in other methods? Also should we mock 
the response from `PartitionMaxBytesStrategy`?



##
core/src/test/java/kafka/server/share/DelayedShareFetchTest.java:
##
@@ -492,7 +492,7 @@ public void testForceCompleteTriggersDelayedActionsQueue() {
 
 DelayedOperationPurgatory 
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
 "TestShareFetch", mockTimer, replicaManager.localBrokerId(),
-DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
+DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, false, true);

Review Comment:
   Hmm, so do we have tests either in ReplicaManager or elsewhere to 
expirationReaper timed out for DelayShareFetch?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org