chia7712 commented on code in PR #19582:
URL: https://github.com/apache/kafka/pull/19582#discussion_r2073303842


##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerAssignmentPoller.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.server.util.ShutdownableThread;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class ConsumerAssignmentPoller extends ShutdownableThread {
+    private final Consumer<byte[], byte[]> consumer;
+
+    private final Set<TopicPartition> partitionAssignment = new HashSet<>();
+    private volatile boolean subscriptionChanged = false;
+    private final List<String> topicsSubscription;
+    private final ConsumerRebalanceListener rebalanceListener;
+
+    public ConsumerAssignmentPoller(
+        Consumer<byte[], byte[]> consumer,
+        List<String> topicsToSubscribe
+    ) {
+        this(consumer, topicsToSubscribe, Set.of(), null);
+    }
+
+    public ConsumerAssignmentPoller(
+        Consumer<byte[], byte[]> consumer,
+        List<String> topicsToSubscribe,
+        Set<TopicPartition> partitionsToAssign,
+        ConsumerRebalanceListener userRebalanceListener
+    ) {
+        super("daemon-consumer-assignment", false);
+        this.consumer = consumer;
+        this.topicsSubscription = topicsToSubscribe;
+
+        this.rebalanceListener = new ConsumerRebalanceListener() {
+            @Override
+            public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
+                partitionAssignment.addAll(partitions);
+                if (userRebalanceListener != null)
+                    userRebalanceListener.onPartitionsAssigned(partitions);
+            }
+
+            @Override
+            public void onPartitionsRevoked(Collection<TopicPartition> 
partitions) {
+                partitionAssignment.removeAll(partitions);
+                if (userRebalanceListener != null)
+                    userRebalanceListener.onPartitionsRevoked(partitions);
+            }
+        };
+
+        if (partitionsToAssign.isEmpty()) {
+            consumer.subscribe(topicsToSubscribe, rebalanceListener);
+        } else {
+            consumer.assign(List.copyOf(partitionsToAssign));
+        }
+    }
+
+    public Set<TopicPartition> consumerAssignment() {
+        return Set.copyOf(partitionAssignment);
+    }
+
+    @Override
+    public boolean initiateShutdown() {
+        boolean res = super.initiateShutdown();
+        consumer.wakeup();
+        return res;
+    }
+
+    @Override
+    public void doWork() {
+        if (subscriptionChanged) {
+            consumer.subscribe(topicsSubscription, rebalanceListener);
+            subscriptionChanged = false;
+        }
+        try {
+            consumer.poll(Duration.ofMillis(50)).count();
+        } catch (WakeupException e) {
+            // ignore for shutdown
+        } catch (Throwable e) {

Review Comment:
   this catch is redundant



##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerPollTest.java:
##########
@@ -0,0 +1,686 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.TestUtils;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.common.utils.Utils;
+
+import org.junit.jupiter.api.BeforeEach;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.IntStream;
+
+import static 
org.apache.kafka.clients.ClientsTestUtils.consumeAndVerifyRecords;
+import static org.apache.kafka.clients.ClientsTestUtils.sendRecords;
+import static 
org.apache.kafka.clients.CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG;
+import static 
org.apache.kafka.clients.CommonClientConfigs.SESSION_TIMEOUT_MS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.CLIENT_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.MAX_POLL_RECORDS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ClusterTestDefaults(
+    types = {Type.KRAFT},
+    brokers = PlaintextConsumerPollTest.BROKER_COUNT,
+    serverProperties = {
+        @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = 
"1"),
+        @ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, 
value = "3"),
+        @ClusterConfigProperty(key = GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 
value = "100"),
+        @ClusterConfigProperty(key = 
CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, value = "500"),
+        @ClusterConfigProperty(key = 
CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, value = "500"),
+        @ClusterConfigProperty(key = GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 
value = "10"),
+    }
+)
+public class PlaintextConsumerPollTest {
+
+    public static final int BROKER_COUNT = 3;
+    public static final double EPSILON = 0.1;
+    public static final long GROUP_MAX_SESSION_TIMEOUT_MS = 60000L;
+    private final ClusterInstance cluster;
+    private final String topic = "topic";
+    private final TopicPartition tp = new TopicPartition(topic, 0);
+    private final TopicPartition tp2 = new TopicPartition(topic, 1);
+
+    public PlaintextConsumerPollTest(ClusterInstance cluster) {
+        this.cluster = cluster;
+    }
+    
+    @BeforeEach
+    public void setup() throws InterruptedException {
+        cluster.createTopic(topic, 2, (short) BROKER_COUNT);
+    }
+    
+    @ClusterTest
+    public void testClassicConsumerMaxPollRecords() throws 
InterruptedException {
+        testMaxPollRecords(GroupProtocol.CLASSIC);
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerMaxPollRecords() throws InterruptedException {
+        testMaxPollRecords(GroupProtocol.CONSUMER);
+    }
+
+    private void testMaxPollRecords(GroupProtocol groupProtocol) throws 
InterruptedException {
+        var maxPollRecords = 100;
+        var numRecords = 5000;
+        Map<String, Object> config = Map.of(
+            MAX_POLL_RECORDS_CONFIG, maxPollRecords,
+            GROUP_PROTOCOL_CONFIG, 
groupProtocol.name().toLowerCase(Locale.ROOT)
+        );
+        var startingTimestamp = System.currentTimeMillis();
+        sendRecords(cluster, tp, numRecords, startingTimestamp);
+        try (Consumer<byte[], byte[]> consumer = cluster.consumer(config)) {
+            consumer.assign(List.of(tp));
+            consumeAndVerifyRecords(
+                consumer,
+                tp,
+                numRecords,
+                maxPollRecords,
+                0,
+                0,
+                startingTimestamp,
+                -1
+            );
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerMaxPollIntervalMs() throws 
InterruptedException {
+        testMaxPollIntervalMs(Map.of(
+            MAX_POLL_INTERVAL_MS_CONFIG, 1000,
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT),
+            HEARTBEAT_INTERVAL_MS_CONFIG, 500,
+            SESSION_TIMEOUT_MS_CONFIG, 2000
+        ));
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerMaxPollIntervalMs() throws 
InterruptedException {
+        testMaxPollIntervalMs(Map.of(
+            MAX_POLL_INTERVAL_MS_CONFIG, 1000,
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)
+        ));
+    }
+
+    private void testMaxPollIntervalMs(Map<String, Object> config) throws 
InterruptedException {
+        try (Consumer<byte[], byte[]> consumer = cluster.consumer(config)) {
+            var listener = new TestConsumerReassignmentListener();
+            consumer.subscribe(List.of(topic), listener);
+
+            // rebalance to get the initial assignment
+            awaitRebalance(consumer, listener);
+            assertEquals(1, listener.callsToAssigned);
+            assertEquals(0, listener.callsToRevoked);
+
+            // after we extend longer than max.poll a rebalance should be 
triggered
+            // NOTE we need to have a relatively much larger value than 
max.poll to let heartbeat expired for sure
+            Thread.sleep(3000);

Review Comment:
   Could you please use `TimeUnit` instead?



##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerPollTest.java:
##########
@@ -0,0 +1,686 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.TestUtils;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.common.utils.Utils;
+
+import org.junit.jupiter.api.BeforeEach;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.IntStream;
+
+import static 
org.apache.kafka.clients.ClientsTestUtils.consumeAndVerifyRecords;
+import static org.apache.kafka.clients.ClientsTestUtils.sendRecords;
+import static 
org.apache.kafka.clients.CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG;
+import static 
org.apache.kafka.clients.CommonClientConfigs.SESSION_TIMEOUT_MS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.CLIENT_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.MAX_POLL_RECORDS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ClusterTestDefaults(
+    types = {Type.KRAFT},
+    brokers = PlaintextConsumerPollTest.BROKER_COUNT,
+    serverProperties = {
+        @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = 
"1"),
+        @ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, 
value = "3"),
+        @ClusterConfigProperty(key = GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 
value = "100"),
+        @ClusterConfigProperty(key = 
CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, value = "500"),
+        @ClusterConfigProperty(key = 
CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, value = "500"),
+        @ClusterConfigProperty(key = GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 
value = "10"),
+    }
+)
+public class PlaintextConsumerPollTest {
+
+    public static final int BROKER_COUNT = 3;
+    public static final double EPSILON = 0.1;
+    public static final long GROUP_MAX_SESSION_TIMEOUT_MS = 60000L;
+    private final ClusterInstance cluster;
+    private final String topic = "topic";
+    private final TopicPartition tp = new TopicPartition(topic, 0);
+    private final TopicPartition tp2 = new TopicPartition(topic, 1);
+
+    public PlaintextConsumerPollTest(ClusterInstance cluster) {
+        this.cluster = cluster;
+    }
+    
+    @BeforeEach
+    public void setup() throws InterruptedException {
+        cluster.createTopic(topic, 2, (short) BROKER_COUNT);
+    }
+    
+    @ClusterTest
+    public void testClassicConsumerMaxPollRecords() throws 
InterruptedException {
+        testMaxPollRecords(GroupProtocol.CLASSIC);
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerMaxPollRecords() throws InterruptedException {
+        testMaxPollRecords(GroupProtocol.CONSUMER);
+    }
+
+    private void testMaxPollRecords(GroupProtocol groupProtocol) throws 
InterruptedException {
+        var maxPollRecords = 100;
+        var numRecords = 5000;
+        Map<String, Object> config = Map.of(
+            MAX_POLL_RECORDS_CONFIG, maxPollRecords,
+            GROUP_PROTOCOL_CONFIG, 
groupProtocol.name().toLowerCase(Locale.ROOT)
+        );
+        var startingTimestamp = System.currentTimeMillis();
+        sendRecords(cluster, tp, numRecords, startingTimestamp);
+        try (Consumer<byte[], byte[]> consumer = cluster.consumer(config)) {
+            consumer.assign(List.of(tp));
+            consumeAndVerifyRecords(
+                consumer,
+                tp,
+                numRecords,
+                maxPollRecords,
+                0,
+                0,
+                startingTimestamp,
+                -1
+            );
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerMaxPollIntervalMs() throws 
InterruptedException {
+        testMaxPollIntervalMs(Map.of(
+            MAX_POLL_INTERVAL_MS_CONFIG, 1000,
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT),
+            HEARTBEAT_INTERVAL_MS_CONFIG, 500,
+            SESSION_TIMEOUT_MS_CONFIG, 2000
+        ));
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerMaxPollIntervalMs() throws 
InterruptedException {
+        testMaxPollIntervalMs(Map.of(
+            MAX_POLL_INTERVAL_MS_CONFIG, 1000,
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)
+        ));
+    }
+
+    private void testMaxPollIntervalMs(Map<String, Object> config) throws 
InterruptedException {
+        try (Consumer<byte[], byte[]> consumer = cluster.consumer(config)) {
+            var listener = new TestConsumerReassignmentListener();
+            consumer.subscribe(List.of(topic), listener);
+
+            // rebalance to get the initial assignment
+            awaitRebalance(consumer, listener);
+            assertEquals(1, listener.callsToAssigned);
+            assertEquals(0, listener.callsToRevoked);
+
+            // after we extend longer than max.poll a rebalance should be 
triggered
+            // NOTE we need to have a relatively much larger value than 
max.poll to let heartbeat expired for sure
+            Thread.sleep(3000);
+
+            awaitRebalance(consumer, listener);
+            assertEquals(2, listener.callsToAssigned);
+            assertEquals(1, listener.callsToRevoked);
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerMaxPollIntervalMsDelayInRevocation() throws 
InterruptedException {
+        testMaxPollIntervalMsDelayInRevocation(Map.of(
+            MAX_POLL_INTERVAL_MS_CONFIG, 5000,
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT),
+            HEARTBEAT_INTERVAL_MS_CONFIG, 500,
+            SESSION_TIMEOUT_MS_CONFIG, 1000,
+            ENABLE_AUTO_COMMIT_CONFIG, false
+        ));
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerMaxPollIntervalMsDelayInRevocation() throws 
InterruptedException {
+        testMaxPollIntervalMsDelayInRevocation(Map.of(
+            MAX_POLL_INTERVAL_MS_CONFIG, 5000,
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT),
+            ENABLE_AUTO_COMMIT_CONFIG, false
+        ));
+    }
+
+    private void testMaxPollIntervalMsDelayInRevocation(Map<String, Object> 
config) throws InterruptedException {
+        var commitCompleted = new AtomicBoolean(false);
+        var committedPosition = new AtomicLong(-1);
+
+        try (Consumer<byte[], byte[]> consumer = cluster.consumer(config)) {
+            var listener = new TestConsumerReassignmentListener() {
+                @Override
+                public void onPartitionsLost(Collection<TopicPartition> 
partitions) {
+                    // no op
+                }
+
+                @Override
+                public void onPartitionsRevoked(Collection<TopicPartition> 
partitions) {
+                    if (!partitions.isEmpty() && partitions.contains(tp)) {
+                        // on the second rebalance (after we have joined the 
group initially), sleep longer
+                        // than session timeout and then try a commit. We 
should still be in the group,
+                        // so the commit should succeed
+                        Utils.sleep(1500);
+                        committedPosition.set(consumer.position(tp));
+                        var offsets = Map.of(tp, new 
OffsetAndMetadata(committedPosition.get()));
+                        consumer.commitSync(offsets);
+                        commitCompleted.set(true);
+                    }
+                    super.onPartitionsRevoked(partitions);
+                }
+            };
+            consumer.subscribe(List.of(topic), listener);
+
+            // rebalance to get the initial assignment
+            awaitRebalance(consumer, listener);
+
+            // force a rebalance to trigger an invocation of the revocation 
callback while in the group
+            consumer.subscribe(List.of("otherTopic"), listener);
+            awaitRebalance(consumer, listener);
+
+            assertEquals(0, committedPosition.get());
+            assertTrue(commitCompleted.get());
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerMaxPollIntervalMsDelayInAssignment() throws 
InterruptedException {
+        testMaxPollIntervalMsDelayInAssignment(Map.of(
+            MAX_POLL_INTERVAL_MS_CONFIG, 5000,
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT),
+            HEARTBEAT_INTERVAL_MS_CONFIG, 500,
+            SESSION_TIMEOUT_MS_CONFIG, 1000,
+            ENABLE_AUTO_COMMIT_CONFIG, false
+        ));
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerMaxPollIntervalMsDelayInAssignment() throws 
InterruptedException {
+        testMaxPollIntervalMsDelayInAssignment(Map.of(
+            MAX_POLL_INTERVAL_MS_CONFIG, 5000,
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT),
+            ENABLE_AUTO_COMMIT_CONFIG, false
+        ));
+    }
+
+    private void testMaxPollIntervalMsDelayInAssignment(Map<String, Object> 
config) throws InterruptedException {
+        try (Consumer<byte[], byte[]> consumer = cluster.consumer(config)) {
+            TestConsumerReassignmentListener listener = new 
TestConsumerReassignmentListener() {
+                @Override
+                public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
+                    // sleep longer than the session timeout, we should still 
be in the group after invocation
+                    Utils.sleep(1500);
+                    super.onPartitionsAssigned(partitions);
+                }
+            };
+            consumer.subscribe(List.of(topic), listener);
+            // rebalance to get the initial assignment
+            awaitRebalance(consumer, listener);
+            // We should still be in the group after this invocation
+            ensureNoRebalance(consumer, listener);
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerMaxPollIntervalMsShorterThanPollTimeout() 
throws InterruptedException {
+        testMaxPollIntervalMsShorterThanPollTimeout(Map.of(
+            MAX_POLL_INTERVAL_MS_CONFIG, 1000,
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT),
+            HEARTBEAT_INTERVAL_MS_CONFIG, 500
+        ));
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerMaxPollIntervalMsShorterThanPollTimeout() 
throws InterruptedException {
+        testMaxPollIntervalMsShorterThanPollTimeout(Map.of(
+            MAX_POLL_INTERVAL_MS_CONFIG, 1000,
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)
+        ));
+    }
+
+    private void testMaxPollIntervalMsShorterThanPollTimeout(Map<String, 
Object> config) throws InterruptedException {
+        try (Consumer<byte[], byte[]> consumer = cluster.consumer(config)) {
+            var listener = new TestConsumerReassignmentListener();
+            consumer.subscribe(List.of(topic), listener);
+
+            // rebalance to get the initial assignment
+            awaitRebalance(consumer, listener);
+            var callsToAssignedAfterFirstRebalance = listener.callsToAssigned;
+            
+            consumer.poll(Duration.ofMillis(2000));
+            // If the poll above times out, it would trigger a rebalance.
+            // Leave some time for the rebalance to happen and check for the 
rebalance event.
+            consumer.poll(Duration.ofMillis(500));
+            consumer.poll(Duration.ofMillis(500));
+
+            assertEquals(callsToAssignedAfterFirstRebalance, 
listener.callsToAssigned);
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerPerPartitionLeadWithMaxPollRecords() throws 
InterruptedException {
+        testPerPartitionLeadWithMaxPollRecords(GroupProtocol.CLASSIC);
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerPerPartitionLeadWithMaxPollRecords() throws 
InterruptedException {
+        testPerPartitionLeadWithMaxPollRecords(GroupProtocol.CONSUMER);
+    }
+
+    private void testPerPartitionLeadWithMaxPollRecords(GroupProtocol 
groupProtocol) throws InterruptedException {
+        int numMessages = 1000;
+        int maxPollRecords = 10;
+        Map<String, Object> config = Map.of(
+            GROUP_PROTOCOL_CONFIG, 
groupProtocol.name().toLowerCase(Locale.ROOT),
+            GROUP_ID_CONFIG, "testPerPartitionLeadWithMaxPollRecords",
+            CLIENT_ID_CONFIG, "testPerPartitionLeadWithMaxPollRecords",
+            MAX_POLL_RECORDS_CONFIG, maxPollRecords
+        );
+
+        sendRecords(cluster, tp, numMessages);
+        try (Consumer<byte[], byte[]> consumer = cluster.consumer(config)) {
+            consumer.assign(List.of(tp));
+            awaitNonEmptyRecords(consumer, tp, 100);
+
+            var tags = Map.of(
+                "client-id", "testPerPartitionLeadWithMaxPollRecords",
+                "topic", tp.topic(),
+                "partition", String.valueOf(tp.partition())
+            );
+            var lead = consumer.metrics()
+                    .get(new MetricName("records-lead", 
"consumer-fetch-manager-metrics", "", tags));
+            assertEquals(maxPollRecords, (Double) lead.metricValue(), "The 
lead should be " + maxPollRecords);
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerPerPartitionLagWithMaxPollRecords() throws 
InterruptedException {
+        testPerPartitionLagWithMaxPollRecords(GroupProtocol.CLASSIC);
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerPerPartitionLagWithMaxPollRecords() throws 
InterruptedException {
+        testPerPartitionLagWithMaxPollRecords(GroupProtocol.CONSUMER);
+    }
+
+    private void testPerPartitionLagWithMaxPollRecords(GroupProtocol 
groupProtocol) throws InterruptedException {
+        int numMessages = 1000;
+        int maxPollRecords = 10;
+        Map<String, Object> config = Map.of(
+            GROUP_PROTOCOL_CONFIG, 
groupProtocol.name().toLowerCase(Locale.ROOT),
+            GROUP_ID_CONFIG, "testPerPartitionLagWithMaxPollRecords",
+            CLIENT_ID_CONFIG, "testPerPartitionLagWithMaxPollRecords",
+            MAX_POLL_RECORDS_CONFIG, maxPollRecords
+        );
+        sendRecords(cluster, tp, numMessages);
+        try (Consumer<byte[], byte[]> consumer = cluster.consumer(config)) {
+            consumer.assign(List.of(tp));
+            var records = awaitNonEmptyRecords(consumer, tp, 100);
+
+            var tags = Map.of(
+                "client-id", "testPerPartitionLagWithMaxPollRecords",
+                "topic", tp.topic(),
+                "partition", String.valueOf(tp.partition())
+            );
+            var lag = consumer.metrics()
+                    .get(new MetricName("records-lag", 
"consumer-fetch-manager-metrics", "", tags));
+
+            // Count the number of records received
+            var recordCount = records.count();
+            assertEquals(
+                numMessages - recordCount,
+                (Double) lag.metricValue(),
+                EPSILON,
+                "The lag should be " + (numMessages - recordCount)
+            );
+        }
+    }
+
+    @ClusterTest
+    public void runCloseClassicConsumerMultiConsumerSessionTimeoutTest() 
throws InterruptedException {
+        runMultiConsumerSessionTimeoutTest(GroupProtocol.CLASSIC, true);
+    }
+
+    @ClusterTest
+    public void runClassicConsumerMultiConsumerSessionTimeoutTest() throws 
InterruptedException {
+        runMultiConsumerSessionTimeoutTest(GroupProtocol.CLASSIC, false);
+    }
+
+    @ClusterTest
+    public void runCloseAsyncConsumerMultiConsumerSessionTimeoutTest() throws 
InterruptedException {
+        runMultiConsumerSessionTimeoutTest(GroupProtocol.CONSUMER, true);
+    }
+
+    @ClusterTest
+    public void runAsyncConsumerMultiConsumerSessionTimeoutTest() throws 
InterruptedException {
+        runMultiConsumerSessionTimeoutTest(GroupProtocol.CONSUMER, false);
+    }
+
+    private void runMultiConsumerSessionTimeoutTest(GroupProtocol 
groupProtocol, boolean closeConsumer) throws InterruptedException {
+        String topic1 = "topic1";
+        int partitions = 6;
+        Map<String, Object> config = Map.of(
+            GROUP_PROTOCOL_CONFIG, 
groupProtocol.name().toLowerCase(Locale.ROOT),
+            GROUP_ID_CONFIG, "test-group",
+            MAX_POLL_INTERVAL_MS_CONFIG, 100
+        );
+        // use consumers defined in this class plus one additional consumer
+        // Use topic defined in this class + one additional topic
+        try (Producer<byte[], byte[]> producer = cluster.producer();
+             // create one more consumer and add it to the group; we will 
timeout this consumer

Review Comment:
   `time out` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to