apoorvmittal10 commented on code in PR #19701:
URL: https://github.com/apache/kafka/pull/19701#discussion_r2087080717


##########
core/src/test/java/kafka/server/share/SharePartitionManagerTest.java:
##########
@@ -2803,6 +2804,42 @@ public void testFetchMessagesRotatePartitions() {
         validateRotatedListEquals(topicIdPartitions, 
resultShareFetch.topicIdPartitions(), 1);
     }
 
+    @Test
+    public void testCreateIdleShareFetchTask() throws Exception {
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+
+        MockTime mockTime = new MockTime();

Review Comment:
   `time` which resets in setup can be used.



##########
core/src/test/java/kafka/server/share/SharePartitionManagerTest.java:
##########
@@ -2803,6 +2804,42 @@ public void testFetchMessagesRotatePartitions() {
         validateRotatedListEquals(topicIdPartitions, 
resultShareFetch.topicIdPartitions(), 1);
     }
 
+    @Test
+    public void testCreateIdleShareFetchTask() throws Exception {
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+
+        MockTime mockTime = new MockTime();
+        MockTimer mockTimer = new MockTimer(mockTime);
+        long maxWaitMs = 1000L;
+
+        // Set up the mock to capture and add the timer task
+        Mockito.doAnswer(invocation -> {
+            TimerTask timerTask = invocation.getArgument(0);
+            mockTimer.add(timerTask);
+            return null;
+        
}).when(replicaManager).addDelayedShareFetchTimerRequest(Mockito.any(TimerTask.class));
+
+        sharePartitionManager = SharePartitionManagerBuilder.builder()
+            .withReplicaManager(replicaManager)
+            .withTime(mockTime)
+            .withTimer(mockTimer)
+            .build();
+
+        CompletableFuture<Void> future = 
sharePartitionManager.createIdleShareFetchTimerTask(maxWaitMs);
+
+        // Future should not be completed immediately
+        assertFalse(future.isDone());
+
+        mockTimer.advanceClock(maxWaitMs / 2);
+        assertFalse(future.isDone());
+
+        mockTimer.advanceClock(maxWaitMs);
+

Review Comment:
   nit: remove line break



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -4824,6 +4824,42 @@ class KafkaApisTest extends Logging {
     assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.code, 
responseData.errorCode)
   }
 
+  @Test
+  def testHandleShareFetchRequestWhenShareSessionCacheIsFull(): Unit = {
+    val topicName = "foo"
+    val topicId = Uuid.randomUuid()
+    metadataCache = initializeMetadataCacheWithShareGroupsEnabled()
+    addTopicToMetadataCache(topicName, 1, topicId = topicId)
+    val memberId: Uuid = Uuid.ZERO_UUID

Review Comment:
   Can be inline, not required here.



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -4824,6 +4824,42 @@ class KafkaApisTest extends Logging {
     assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.code, 
responseData.errorCode)
   }
 
+  @Test
+  def testHandleShareFetchRequestWhenShareSessionCacheIsFull(): Unit = {
+    val topicName = "foo"
+    val topicId = Uuid.randomUuid()
+    metadataCache = initializeMetadataCacheWithShareGroupsEnabled()
+    addTopicToMetadataCache(topicName, 1, topicId = topicId)
+    val memberId: Uuid = Uuid.ZERO_UUID
+
+    val groupId = "group"
+
+    when(sharePartitionManager.newContext(any(), any(), any(), any(), any(), 
any()))
+      .thenThrow(Errors.SHARE_SESSION_LIMIT_REACHED.exception)
+
+    when(sharePartitionManager.createIdleShareFetchTimerTask(any()))
+      .thenAnswer(_ => CompletableFuture.runAsync(() => {}))
+
+    val shareFetchRequestData = new ShareFetchRequestData().
+      setGroupId(groupId).

Review Comment:
   ```suggestion
         setGroupId("group").
   ```



##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java:
##########
@@ -2199,6 +2199,41 @@ public void testShareGroupMaxSizeConfigExceeded() throws 
Exception {
         shareConsumer4.close();
     }
 
+    @ClusterTest(
+        brokers = 1,
+        serverProperties = {
+            @ClusterConfigProperty(key = "group.share.max.size", value = "1"), 
// Setting max group size to 1
+            @ClusterConfigProperty(key = "group.share.max.share.sessions", 
value = "1") // Setting max share sessions value to 1
+        }
+    )
+    public void testShareGroupShareSessionCacheIsFull() {
+        alterShareAutoOffsetReset("group1", "earliest");
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer1 = 
createShareConsumer("group1");
+             ShareConsumer<byte[], byte[]> shareConsumer2 = 
createShareConsumer("group2")) {
+
+            ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+            producer.send(record);
+            producer.flush();
+            shareConsumer1.subscribe(Set.of(tp.topic()));
+            shareConsumer2.subscribe(Set.of(tp.topic()));
+
+            ConsumerRecords<byte[], byte[]> records = 
waitedPoll(shareConsumer1, 2500L, 1);
+            assertEquals(1, records.count());
+
+            producer.send(record);
+            producer.flush();
+
+            // The second share consumer should not throw any exception, but 
should not receive any records as well.
+            records = shareConsumer2.poll(Duration.ofMillis(1000));

Review Comment:
   So the client received `session error` but didn't provide records to 
application, correct? Does there appear log line in client which specifies the 
session error? 



##########
core/src/main/java/kafka/server/share/IdleShareFetchTimerTask.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server.share;

Review Comment:
   Also can be an inner class in SharePartitionManager.



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -4824,6 +4824,42 @@ class KafkaApisTest extends Logging {
     assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.code, 
responseData.errorCode)
   }
 
+  @Test
+  def testHandleShareFetchRequestWhenShareSessionCacheIsFull(): Unit = {
+    val topicName = "foo"
+    val topicId = Uuid.randomUuid()
+    metadataCache = initializeMetadataCacheWithShareGroupsEnabled()
+    addTopicToMetadataCache(topicName, 1, topicId = topicId)

Review Comment:
   ```suggestion
       addTopicToMetadataCache("foo", 1, topicId = topicId)
   ```



##########
core/src/main/java/kafka/server/share/IdleShareFetchTimerTask.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server.share;
+
+import org.apache.kafka.server.util.timer.TimerTask;
+
+import java.util.concurrent.CompletableFuture;
+
+public class IdleShareFetchTimerTask extends TimerTask {

Review Comment:
   Need class java docs.



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -448,6 +451,10 @@ class ReplicaManager(val config: KafkaConfig,
     delayedShareFetchPurgatory.tryCompleteElseWatch(delayedShareFetch, 
delayedShareFetchKeys)
   }
 
+  private[server] def addDelayedShareFetchTimerRequest(timerTask: TimerTask): 
Unit = {

Review Comment:
   Need method comments.



##########
core/src/test/java/kafka/server/share/SharePartitionManagerTest.java:
##########
@@ -2803,6 +2804,42 @@ public void testFetchMessagesRotatePartitions() {
         validateRotatedListEquals(topicIdPartitions, 
resultShareFetch.topicIdPartitions(), 1);
     }
 
+    @Test
+    public void testCreateIdleShareFetchTask() throws Exception {
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+
+        MockTime mockTime = new MockTime();
+        MockTimer mockTimer = new MockTimer(mockTime);
+        long maxWaitMs = 1000L;
+
+        // Set up the mock to capture and add the timer task
+        Mockito.doAnswer(invocation -> {
+            TimerTask timerTask = invocation.getArgument(0);
+            mockTimer.add(timerTask);
+            return null;
+        
}).when(replicaManager).addDelayedShareFetchTimerRequest(Mockito.any(TimerTask.class));
+
+        sharePartitionManager = SharePartitionManagerBuilder.builder()
+            .withReplicaManager(replicaManager)
+            .withTime(mockTime)
+            .withTimer(mockTimer)
+            .build();
+
+        CompletableFuture<Void> future = 
sharePartitionManager.createIdleShareFetchTimerTask(maxWaitMs);
+

Review Comment:
   nit: remove line break.



##########
core/src/main/java/kafka/server/share/IdleShareFetchTimerTask.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server.share;

Review Comment:
   Should not be in `core` but in `server` module.



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -291,6 +292,8 @@ class ReplicaManager(val config: KafkaConfig,
   private val metricsGroup = new KafkaMetricsGroup(this.getClass)
   private val addPartitionsToTxnConfig = new AddPartitionsToTxnConfig(config)
 

Review Comment:
   nit: remove line break.



##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -388,6 +389,21 @@ public CompletableFuture<Map<TopicIdPartition, 
ShareAcknowledgeResponseData.Part
         return mapAcknowledgementFutures(futuresMap, Optional.empty());
     }
 
+    /**
+     * The createShareFetchTask creates a timer task to delay the share fetch 
request for maxWaitMs duration.

Review Comment:
   ```suggestion
        * The method creates a timer task to delay the share fetch request for 
maxWaitMs duration.
   ```



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -4824,6 +4824,42 @@ class KafkaApisTest extends Logging {
     assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.code, 
responseData.errorCode)
   }
 
+  @Test
+  def testHandleShareFetchRequestWhenShareSessionCacheIsFull(): Unit = {
+    val topicName = "foo"
+    val topicId = Uuid.randomUuid()
+    metadataCache = initializeMetadataCacheWithShareGroupsEnabled()
+    addTopicToMetadataCache(topicName, 1, topicId = topicId)
+    val memberId: Uuid = Uuid.ZERO_UUID
+
+    val groupId = "group"
+
+    when(sharePartitionManager.newContext(any(), any(), any(), any(), any(), 
any()))
+      .thenThrow(Errors.SHARE_SESSION_LIMIT_REACHED.exception)
+
+    when(sharePartitionManager.createIdleShareFetchTimerTask(any()))
+      .thenAnswer(_ => CompletableFuture.runAsync(() => {}))
+
+    val shareFetchRequestData = new ShareFetchRequestData().
+      setGroupId(groupId).
+      setMemberId(memberId.toString).

Review Comment:
   ```suggestion
         setMemberId(Uuid.randomUuid.toString).
   ```



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -4824,6 +4824,42 @@ class KafkaApisTest extends Logging {
     assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.code, 
responseData.errorCode)
   }
 
+  @Test
+  def testHandleShareFetchRequestWhenShareSessionCacheIsFull(): Unit = {
+    val topicName = "foo"
+    val topicId = Uuid.randomUuid()
+    metadataCache = initializeMetadataCacheWithShareGroupsEnabled()
+    addTopicToMetadataCache(topicName, 1, topicId = topicId)
+    val memberId: Uuid = Uuid.ZERO_UUID
+
+    val groupId = "group"
+
+    when(sharePartitionManager.newContext(any(), any(), any(), any(), any(), 
any()))
+      .thenThrow(Errors.SHARE_SESSION_LIMIT_REACHED.exception)
+
+    when(sharePartitionManager.createIdleShareFetchTimerTask(any()))
+      .thenAnswer(_ => CompletableFuture.runAsync(() => {}))

Review Comment:
   Query: why do we need runAsync?



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -4824,6 +4824,42 @@ class KafkaApisTest extends Logging {
     assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.code, 
responseData.errorCode)
   }
 
+  @Test
+  def testHandleShareFetchRequestWhenShareSessionCacheIsFull(): Unit = {
+    val topicName = "foo"
+    val topicId = Uuid.randomUuid()
+    metadataCache = initializeMetadataCacheWithShareGroupsEnabled()
+    addTopicToMetadataCache(topicName, 1, topicId = topicId)
+    val memberId: Uuid = Uuid.ZERO_UUID
+
+    val groupId = "group"
+
+    when(sharePartitionManager.newContext(any(), any(), any(), any(), any(), 
any()))
+      .thenThrow(Errors.SHARE_SESSION_LIMIT_REACHED.exception)
+
+    when(sharePartitionManager.createIdleShareFetchTimerTask(any()))
+      .thenAnswer(_ => CompletableFuture.runAsync(() => {}))
+
+    val shareFetchRequestData = new ShareFetchRequestData().
+      setGroupId(groupId).
+      setMemberId(memberId.toString).
+      setShareSessionEpoch(0).
+      setTopics(util.List.of(new ShareFetchRequestData.FetchTopic().
+        setTopicId(topicId).
+        setPartitions(util.List.of(
+          new ShareFetchRequestData.FetchPartition()
+            .setPartitionIndex(0)))))
+
+    val shareFetchRequest = new 
ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
+    val request = buildRequest(shareFetchRequest)
+    kafkaApis = createKafkaApis()
+    kafkaApis.handleShareFetchRequest(request)
+    val response = verifyNoThrottling[ShareFetchResponse](request)
+    val responseData = response.data()
+
+    assertEquals(Errors.SHARE_SESSION_LIMIT_REACHED.code, 
responseData.errorCode)

Review Comment:
   How do we verify the if request waited for maxWaitTime?



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -4824,6 +4824,42 @@ class KafkaApisTest extends Logging {
     assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.code, 
responseData.errorCode)
   }
 
+  @Test
+  def testHandleShareFetchRequestWhenShareSessionCacheIsFull(): Unit = {
+    val topicName = "foo"
+    val topicId = Uuid.randomUuid()
+    metadataCache = initializeMetadataCacheWithShareGroupsEnabled()
+    addTopicToMetadataCache(topicName, 1, topicId = topicId)
+    val memberId: Uuid = Uuid.ZERO_UUID
+
+    val groupId = "group"

Review Comment:
   Can be inline, not required here.



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -4824,6 +4824,42 @@ class KafkaApisTest extends Logging {
     assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.code, 
responseData.errorCode)
   }
 
+  @Test
+  def testHandleShareFetchRequestWhenShareSessionCacheIsFull(): Unit = {
+    val topicName = "foo"

Review Comment:
   Can be inline, not required here.



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -448,6 +451,10 @@ class ReplicaManager(val config: KafkaConfig,
     delayedShareFetchPurgatory.tryCompleteElseWatch(delayedShareFetch, 
delayedShareFetchKeys)
   }
 
+  private[server] def addDelayedShareFetchTimerRequest(timerTask: TimerTask): 
Unit = {

Review Comment:
   addDelayedShareFetchTimerRequest => addShareFetchTimerRequest



##########
core/src/main/java/kafka/server/share/IdleShareFetchTimerTask.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server.share;
+
+import org.apache.kafka.server.util.timer.TimerTask;
+
+import java.util.concurrent.CompletableFuture;
+
+public class IdleShareFetchTimerTask extends TimerTask {
+
+    private final CompletableFuture<Void> future;

Review Comment:
   Need comments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to