chia7712 commented on code in PR #19582:
URL: https://github.com/apache/kafka/pull/19582#discussion_r2095391155


##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerPollTest.java:
##########
@@ -0,0 +1,755 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.TestUtils;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.common.utils.Utils;
+
+import org.junit.jupiter.api.BeforeEach;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.IntStream;
+
+import static 
org.apache.kafka.clients.ClientsTestUtils.consumeAndVerifyRecords;
+import static org.apache.kafka.clients.ClientsTestUtils.sendRecords;
+import static 
org.apache.kafka.clients.CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG;
+import static 
org.apache.kafka.clients.CommonClientConfigs.SESSION_TIMEOUT_MS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.CLIENT_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.MAX_POLL_RECORDS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ClusterTestDefaults(
+    types = {Type.KRAFT},
+    brokers = PlaintextConsumerPollTest.BROKER_COUNT,
+    serverProperties = {
+        @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = 
"1"),
+        @ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, 
value = "3"),
+        @ClusterConfigProperty(key = GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 
value = "100"),
+        @ClusterConfigProperty(key = 
CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, value = "500"),
+        @ClusterConfigProperty(key = 
CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, value = "500"),
+        @ClusterConfigProperty(key = GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 
value = "10"),
+    }
+)
+public class PlaintextConsumerPollTest {
+
+    public static final int BROKER_COUNT = 3;
+    public static final double EPSILON = 0.1;
+    public static final long GROUP_MAX_SESSION_TIMEOUT_MS = 60000L;
+    private final ClusterInstance cluster;
+    private final String topic = "topic";
+    private final TopicPartition tp = new TopicPartition(topic, 0);
+    private final TopicPartition tp2 = new TopicPartition(topic, 1);
+
+    public PlaintextConsumerPollTest(ClusterInstance cluster) {
+        this.cluster = cluster;
+    }
+    
+    @BeforeEach
+    public void setup() throws InterruptedException {
+        cluster.createTopic(topic, 2, (short) BROKER_COUNT);
+    }
+    
+    @ClusterTest
+    public void testClassicConsumerMaxPollRecords() throws 
InterruptedException {
+        testMaxPollRecords(GroupProtocol.CLASSIC);
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerMaxPollRecords() throws InterruptedException {
+        testMaxPollRecords(GroupProtocol.CONSUMER);
+    }
+
+    private void testMaxPollRecords(GroupProtocol groupProtocol) throws 
InterruptedException {
+        var maxPollRecords = 100;
+        var numRecords = 5000;
+        Map<String, Object> config = Map.of(
+            MAX_POLL_RECORDS_CONFIG, maxPollRecords,
+            GROUP_PROTOCOL_CONFIG, 
groupProtocol.name().toLowerCase(Locale.ROOT)
+        );
+        var startingTimestamp = System.currentTimeMillis();
+        sendRecords(cluster, tp, numRecords, startingTimestamp);
+        try (Consumer<byte[], byte[]> consumer = cluster.consumer(config)) {
+            consumer.assign(List.of(tp));
+            consumeAndVerifyRecords(
+                consumer,
+                tp,
+                numRecords,
+                maxPollRecords,
+                0,
+                0,
+                startingTimestamp,
+                -1
+            );
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerMaxPollIntervalMs() throws 
InterruptedException {
+        testMaxPollIntervalMs(Map.of(
+            MAX_POLL_INTERVAL_MS_CONFIG, 1000,
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT),
+            HEARTBEAT_INTERVAL_MS_CONFIG, 500,
+            SESSION_TIMEOUT_MS_CONFIG, 2000
+        ));
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerMaxPollIntervalMs() throws 
InterruptedException {
+        testMaxPollIntervalMs(Map.of(
+            MAX_POLL_INTERVAL_MS_CONFIG, 1000,
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)
+        ));
+    }
+
+    private void testMaxPollIntervalMs(Map<String, Object> config) throws 
InterruptedException {
+        try (Consumer<byte[], byte[]> consumer = cluster.consumer(config)) {
+            var listener = new TestConsumerReassignmentListener();
+            consumer.subscribe(List.of(topic), listener);
+
+            // rebalance to get the initial assignment
+            awaitRebalance(consumer, listener);
+            assertEquals(1, listener.callsToAssigned);
+            assertEquals(0, listener.callsToRevoked);
+
+            // after we extend longer than max.poll a rebalance should be 
triggered
+            // NOTE we need to have a relatively much larger value than 
max.poll to let heartbeat expired for sure
+            TimeUnit.MILLISECONDS.sleep(3000);
+
+            awaitRebalance(consumer, listener);
+            assertEquals(2, listener.callsToAssigned);
+            assertEquals(1, listener.callsToRevoked);
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerMaxPollIntervalMsDelayInRevocation() throws 
InterruptedException {
+        testMaxPollIntervalMsDelayInRevocation(Map.of(
+            MAX_POLL_INTERVAL_MS_CONFIG, 5000,
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT),
+            HEARTBEAT_INTERVAL_MS_CONFIG, 500,
+            SESSION_TIMEOUT_MS_CONFIG, 1000,
+            ENABLE_AUTO_COMMIT_CONFIG, false
+        ));
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerMaxPollIntervalMsDelayInRevocation() throws 
InterruptedException {
+        testMaxPollIntervalMsDelayInRevocation(Map.of(
+            MAX_POLL_INTERVAL_MS_CONFIG, 5000,
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT),
+            ENABLE_AUTO_COMMIT_CONFIG, false
+        ));
+    }
+
+    private void testMaxPollIntervalMsDelayInRevocation(Map<String, Object> 
config) throws InterruptedException {
+        var commitCompleted = new AtomicBoolean(false);
+        var committedPosition = new AtomicLong(-1);
+
+        try (Consumer<byte[], byte[]> consumer = cluster.consumer(config)) {
+            var listener = new TestConsumerReassignmentListener() {
+                @Override
+                public void onPartitionsLost(Collection<TopicPartition> 
partitions) {
+                    // no op
+                }
+
+                @Override
+                public void onPartitionsRevoked(Collection<TopicPartition> 
partitions) {
+                    if (!partitions.isEmpty() && partitions.contains(tp)) {
+                        // on the second rebalance (after we have joined the 
group initially), sleep longer
+                        // than session timeout and then try a commit. We 
should still be in the group,
+                        // so the commit should succeed
+                        Utils.sleep(1500);
+                        committedPosition.set(consumer.position(tp));
+                        var offsets = Map.of(tp, new 
OffsetAndMetadata(committedPosition.get()));
+                        consumer.commitSync(offsets);
+                        commitCompleted.set(true);
+                    }
+                    super.onPartitionsRevoked(partitions);
+                }
+            };
+            consumer.subscribe(List.of(topic), listener);
+
+            // rebalance to get the initial assignment
+            awaitRebalance(consumer, listener);
+
+            // force a rebalance to trigger an invocation of the revocation 
callback while in the group
+            consumer.subscribe(List.of("otherTopic"), listener);
+            awaitRebalance(consumer, listener);
+
+            assertEquals(0, committedPosition.get());
+            assertTrue(commitCompleted.get());
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerMaxPollIntervalMsDelayInAssignment() throws 
InterruptedException {
+        testMaxPollIntervalMsDelayInAssignment(Map.of(
+            MAX_POLL_INTERVAL_MS_CONFIG, 5000,
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT),
+            HEARTBEAT_INTERVAL_MS_CONFIG, 500,
+            SESSION_TIMEOUT_MS_CONFIG, 1000,
+            ENABLE_AUTO_COMMIT_CONFIG, false
+        ));
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerMaxPollIntervalMsDelayInAssignment() throws 
InterruptedException {
+        testMaxPollIntervalMsDelayInAssignment(Map.of(
+            MAX_POLL_INTERVAL_MS_CONFIG, 5000,
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT),
+            ENABLE_AUTO_COMMIT_CONFIG, false
+        ));
+    }
+
+    private void testMaxPollIntervalMsDelayInAssignment(Map<String, Object> 
config) throws InterruptedException {
+        try (Consumer<byte[], byte[]> consumer = cluster.consumer(config)) {
+            TestConsumerReassignmentListener listener = new 
TestConsumerReassignmentListener() {
+                @Override
+                public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
+                    // sleep longer than the session timeout, we should still 
be in the group after invocation
+                    Utils.sleep(1500);
+                    super.onPartitionsAssigned(partitions);
+                }
+            };
+            consumer.subscribe(List.of(topic), listener);
+            // rebalance to get the initial assignment
+            awaitRebalance(consumer, listener);
+            // We should still be in the group after this invocation
+            ensureNoRebalance(consumer, listener);
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerMaxPollIntervalMsShorterThanPollTimeout() 
throws InterruptedException {
+        testMaxPollIntervalMsShorterThanPollTimeout(Map.of(
+            MAX_POLL_INTERVAL_MS_CONFIG, 1000,
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT),
+            HEARTBEAT_INTERVAL_MS_CONFIG, 500
+        ));
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerMaxPollIntervalMsShorterThanPollTimeout() 
throws InterruptedException {
+        testMaxPollIntervalMsShorterThanPollTimeout(Map.of(
+            MAX_POLL_INTERVAL_MS_CONFIG, 1000,
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)
+        ));
+    }
+
+    private void testMaxPollIntervalMsShorterThanPollTimeout(Map<String, 
Object> config) throws InterruptedException {
+        try (Consumer<byte[], byte[]> consumer = cluster.consumer(config)) {
+            var listener = new TestConsumerReassignmentListener();
+            consumer.subscribe(List.of(topic), listener);
+
+            // rebalance to get the initial assignment
+            awaitRebalance(consumer, listener);
+            var callsToAssignedAfterFirstRebalance = listener.callsToAssigned;
+            
+            consumer.poll(Duration.ofMillis(2000));
+            // If the poll above times out, it would trigger a rebalance.
+            // Leave some time for the rebalance to happen and check for the 
rebalance event.
+            consumer.poll(Duration.ofMillis(500));
+            consumer.poll(Duration.ofMillis(500));
+
+            assertEquals(callsToAssignedAfterFirstRebalance, 
listener.callsToAssigned);
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerPerPartitionLeadWithMaxPollRecords() throws 
InterruptedException {
+        testPerPartitionLeadWithMaxPollRecords(GroupProtocol.CLASSIC);
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerPerPartitionLeadWithMaxPollRecords() throws 
InterruptedException {
+        testPerPartitionLeadWithMaxPollRecords(GroupProtocol.CONSUMER);
+    }
+
+    private void testPerPartitionLeadWithMaxPollRecords(GroupProtocol 
groupProtocol) throws InterruptedException {
+        int numMessages = 1000;
+        int maxPollRecords = 10;
+        Map<String, Object> config = Map.of(
+            GROUP_PROTOCOL_CONFIG, 
groupProtocol.name().toLowerCase(Locale.ROOT),
+            GROUP_ID_CONFIG, "testPerPartitionLeadWithMaxPollRecords",
+            CLIENT_ID_CONFIG, "testPerPartitionLeadWithMaxPollRecords",
+            MAX_POLL_RECORDS_CONFIG, maxPollRecords
+        );
+
+        sendRecords(cluster, tp, numMessages);
+        try (Consumer<byte[], byte[]> consumer = cluster.consumer(config)) {
+            consumer.assign(List.of(tp));
+            awaitNonEmptyRecords(consumer, tp, 100);
+
+            var tags = Map.of(
+                "client-id", "testPerPartitionLeadWithMaxPollRecords",
+                "topic", tp.topic(),
+                "partition", String.valueOf(tp.partition())
+            );
+            var lead = consumer.metrics()
+                    .get(new MetricName("records-lead", 
"consumer-fetch-manager-metrics", "", tags));
+            assertEquals(maxPollRecords, (Double) lead.metricValue(), "The 
lead should be " + maxPollRecords);
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerPerPartitionLagWithMaxPollRecords() throws 
InterruptedException {
+        testPerPartitionLagWithMaxPollRecords(GroupProtocol.CLASSIC);
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerPerPartitionLagWithMaxPollRecords() throws 
InterruptedException {
+        testPerPartitionLagWithMaxPollRecords(GroupProtocol.CONSUMER);
+    }
+
+    private void testPerPartitionLagWithMaxPollRecords(GroupProtocol 
groupProtocol) throws InterruptedException {
+        int numMessages = 1000;
+        int maxPollRecords = 10;
+        Map<String, Object> config = Map.of(
+            GROUP_PROTOCOL_CONFIG, 
groupProtocol.name().toLowerCase(Locale.ROOT),
+            GROUP_ID_CONFIG, "testPerPartitionLagWithMaxPollRecords",
+            CLIENT_ID_CONFIG, "testPerPartitionLagWithMaxPollRecords",
+            MAX_POLL_RECORDS_CONFIG, maxPollRecords
+        );
+        sendRecords(cluster, tp, numMessages);
+        try (Consumer<byte[], byte[]> consumer = cluster.consumer(config)) {
+            consumer.assign(List.of(tp));
+            var records = awaitNonEmptyRecords(consumer, tp, 100);
+
+            var tags = Map.of(
+                "client-id", "testPerPartitionLagWithMaxPollRecords",
+                "topic", tp.topic(),
+                "partition", String.valueOf(tp.partition())
+            );
+            var lag = consumer.metrics()
+                    .get(new MetricName("records-lag", 
"consumer-fetch-manager-metrics", "", tags));
+
+            // Count the number of records received
+            var recordCount = records.count();
+            assertEquals(
+                numMessages - recordCount,
+                (Double) lag.metricValue(),
+                EPSILON,
+                "The lag should be " + (numMessages - recordCount)
+            );
+        }
+    }
+
+    @ClusterTest
+    public void runCloseClassicConsumerMultiConsumerSessionTimeoutTest() 
throws InterruptedException {
+        runMultiConsumerSessionTimeoutTest(GroupProtocol.CLASSIC, true);
+    }
+
+    @ClusterTest
+    public void runClassicConsumerMultiConsumerSessionTimeoutTest() throws 
InterruptedException {
+        runMultiConsumerSessionTimeoutTest(GroupProtocol.CLASSIC, false);
+    }
+
+    @ClusterTest
+    public void runCloseAsyncConsumerMultiConsumerSessionTimeoutTest() throws 
InterruptedException {
+        runMultiConsumerSessionTimeoutTest(GroupProtocol.CONSUMER, true);
+    }
+
+    @ClusterTest
+    public void runAsyncConsumerMultiConsumerSessionTimeoutTest() throws 
InterruptedException {
+        runMultiConsumerSessionTimeoutTest(GroupProtocol.CONSUMER, false);
+    }
+
+    private void runMultiConsumerSessionTimeoutTest(GroupProtocol 
groupProtocol, boolean closeConsumer) throws InterruptedException {
+        String topic1 = "topic1";
+        int partitions = 6;
+        Map<String, Object> config = Map.of(
+            GROUP_PROTOCOL_CONFIG, 
groupProtocol.name().toLowerCase(Locale.ROOT),
+            GROUP_ID_CONFIG, "test-group",
+            MAX_POLL_INTERVAL_MS_CONFIG, 100
+        );
+        // use consumers defined in this class plus one additional consumer
+        // Use topic defined in this class + one additional topic
+        try (Producer<byte[], byte[]> producer = cluster.producer();
+             // create one more consumer and add it to the group; we will time 
out this consumer
+             Consumer<byte[], byte[]> consumer1 = cluster.consumer(config);
+             Consumer<byte[], byte[]> consumer2 = cluster.consumer(config);
+             Consumer<byte[], byte[]> timeoutConsumer = 
cluster.consumer(config)
+        ) {
+            sendRecords(producer, tp, 100, System.currentTimeMillis(), -1);
+            sendRecords(producer, tp2, 100, System.currentTimeMillis(), -1);
+
+            Set<TopicPartition> subscriptions = new HashSet<>();
+            subscriptions.add(tp);
+            subscriptions.add(tp2);
+
+            cluster.createTopic(topic1, partitions, (short) BROKER_COUNT);
+            IntStream.range(0, partitions).forEach(partition -> {
+                TopicPartition topicPartition = new TopicPartition(topic1, 
partition);
+                sendRecords(producer, topicPartition, 100, 
System.currentTimeMillis(), -1);
+                subscriptions.add(topicPartition);
+            });
+
+            // first subscribe consumers that are defined in this class
+            List<ConsumerAssignmentPoller> consumerPollers = new ArrayList<>();
+            consumerPollers.add(subscribeConsumerAndStartPolling(consumer1, 
List.of(topic, topic1)));
+            consumerPollers.add(subscribeConsumerAndStartPolling(consumer2, 
List.of(topic, topic1)));
+            
+            ConsumerAssignmentPoller timeoutPoller = 
subscribeConsumerAndStartPolling(timeoutConsumer, List.of(topic, topic1));
+            consumerPollers.add(timeoutPoller);
+
+            // validate the initial assignment
+            validateGroupAssignment(consumerPollers, subscriptions, null);
+
+            // stop polling and close one of the consumers, should trigger 
partition re-assignment among alive consumers
+            timeoutPoller.shutdown();
+            consumerPollers.remove(timeoutPoller);
+            if (closeConsumer)
+                timeoutConsumer.close();
+
+            validateGroupAssignment(consumerPollers, subscriptions,
+                    "Did not get valid assignment for partitions " + 
subscriptions + " after one consumer left");
+
+            // done with pollers and consumers
+            for (ConsumerAssignmentPoller poller : consumerPollers)
+                poller.shutdown();
+        }
+    }
+
+    @ClusterTest
+    public void 
testClassicConsumerPollEventuallyReturnsRecordsWithZeroTimeout() throws 
InterruptedException {
+        testPollEventuallyReturnsRecordsWithZeroTimeout(GroupProtocol.CLASSIC);
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerPollEventuallyReturnsRecordsWithZeroTimeout() 
throws InterruptedException {
+        
testPollEventuallyReturnsRecordsWithZeroTimeout(GroupProtocol.CONSUMER);
+    }
+
+    private void testPollEventuallyReturnsRecordsWithZeroTimeout(GroupProtocol 
groupProtocol) throws InterruptedException {
+        int numMessages = 100;
+        Map<String, Object> config = Map.of(GROUP_PROTOCOL_CONFIG, 
groupProtocol.name().toLowerCase(Locale.ROOT));
+        sendRecords(cluster, tp, numMessages);
+        
+        try (Consumer<byte[], byte[]> consumer = cluster.consumer(config)) {
+            consumer.subscribe(List.of(topic));
+            var records = awaitNonEmptyRecords(consumer, tp, 0L);
+            assertEquals(numMessages, records.count());
+        }
+        
+    }
+
+    @ClusterTest
+    public void testClassicConsumerNoOffsetForPartitionExceptionOnPollZero() 
throws InterruptedException {
+        testNoOffsetForPartitionExceptionOnPollZero(GroupProtocol.CLASSIC);
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerNoOffsetForPartitionExceptionOnPollZero() 
throws InterruptedException {
+        testNoOffsetForPartitionExceptionOnPollZero(GroupProtocol.CONSUMER);
+    }
+
+    private void testNoOffsetForPartitionExceptionOnPollZero(GroupProtocol 
groupProtocol) throws InterruptedException {
+        Map<String, Object> config = Map.of(
+            GROUP_PROTOCOL_CONFIG, 
groupProtocol.name().toLowerCase(Locale.ROOT), 
+            AUTO_OFFSET_RESET_CONFIG, "none"
+        );
+        try (Consumer<byte[], byte[]> consumer = cluster.consumer(config)) {
+            consumer.assign(List.of(tp));
+
+            // continuous poll should eventually fail because there is no 
offset reset strategy set 
+            // (fail only when resetting positions after coordinator is known)
+            TestUtils.waitForCondition(() -> {

Review Comment:
   ```java
               TestUtils.waitForCondition(() -> {
                   try {
                       consumer.poll(Duration.ZERO);
                       return false;
                   } catch (NoOffsetForPartitionException e) {
                       return true;
                   }
               }, "Continuous poll not fail");
   ```
   WDYT?



##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerPollTest.java:
##########
@@ -0,0 +1,755 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.TestUtils;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.common.utils.Utils;
+
+import org.junit.jupiter.api.BeforeEach;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.IntStream;
+
+import static 
org.apache.kafka.clients.ClientsTestUtils.consumeAndVerifyRecords;
+import static org.apache.kafka.clients.ClientsTestUtils.sendRecords;
+import static 
org.apache.kafka.clients.CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG;
+import static 
org.apache.kafka.clients.CommonClientConfigs.SESSION_TIMEOUT_MS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.CLIENT_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.MAX_POLL_RECORDS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ClusterTestDefaults(
+    types = {Type.KRAFT},
+    brokers = PlaintextConsumerPollTest.BROKER_COUNT,
+    serverProperties = {
+        @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = 
"1"),
+        @ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, 
value = "3"),
+        @ClusterConfigProperty(key = GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 
value = "100"),
+        @ClusterConfigProperty(key = 
CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, value = "500"),
+        @ClusterConfigProperty(key = 
CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, value = "500"),
+        @ClusterConfigProperty(key = GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 
value = "10"),
+    }
+)
+public class PlaintextConsumerPollTest {
+
+    public static final int BROKER_COUNT = 3;
+    public static final double EPSILON = 0.1;
+    public static final long GROUP_MAX_SESSION_TIMEOUT_MS = 60000L;
+    private final ClusterInstance cluster;
+    private final String topic = "topic";
+    private final TopicPartition tp = new TopicPartition(topic, 0);
+    private final TopicPartition tp2 = new TopicPartition(topic, 1);
+
+    public PlaintextConsumerPollTest(ClusterInstance cluster) {
+        this.cluster = cluster;
+    }
+    
+    @BeforeEach
+    public void setup() throws InterruptedException {
+        cluster.createTopic(topic, 2, (short) BROKER_COUNT);
+    }
+    
+    @ClusterTest
+    public void testClassicConsumerMaxPollRecords() throws 
InterruptedException {
+        testMaxPollRecords(GroupProtocol.CLASSIC);
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerMaxPollRecords() throws InterruptedException {
+        testMaxPollRecords(GroupProtocol.CONSUMER);
+    }
+
+    private void testMaxPollRecords(GroupProtocol groupProtocol) throws 
InterruptedException {
+        var maxPollRecords = 100;
+        var numRecords = 5000;
+        Map<String, Object> config = Map.of(
+            MAX_POLL_RECORDS_CONFIG, maxPollRecords,
+            GROUP_PROTOCOL_CONFIG, 
groupProtocol.name().toLowerCase(Locale.ROOT)
+        );
+        var startingTimestamp = System.currentTimeMillis();
+        sendRecords(cluster, tp, numRecords, startingTimestamp);
+        try (Consumer<byte[], byte[]> consumer = cluster.consumer(config)) {
+            consumer.assign(List.of(tp));
+            consumeAndVerifyRecords(
+                consumer,
+                tp,
+                numRecords,
+                maxPollRecords,
+                0,
+                0,
+                startingTimestamp,
+                -1
+            );
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerMaxPollIntervalMs() throws 
InterruptedException {
+        testMaxPollIntervalMs(Map.of(
+            MAX_POLL_INTERVAL_MS_CONFIG, 1000,
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT),
+            HEARTBEAT_INTERVAL_MS_CONFIG, 500,
+            SESSION_TIMEOUT_MS_CONFIG, 2000
+        ));
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerMaxPollIntervalMs() throws 
InterruptedException {
+        testMaxPollIntervalMs(Map.of(
+            MAX_POLL_INTERVAL_MS_CONFIG, 1000,
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)
+        ));
+    }
+
+    private void testMaxPollIntervalMs(Map<String, Object> config) throws 
InterruptedException {
+        try (Consumer<byte[], byte[]> consumer = cluster.consumer(config)) {
+            var listener = new TestConsumerReassignmentListener();
+            consumer.subscribe(List.of(topic), listener);
+
+            // rebalance to get the initial assignment
+            awaitRebalance(consumer, listener);
+            assertEquals(1, listener.callsToAssigned);
+            assertEquals(0, listener.callsToRevoked);
+
+            // after we extend longer than max.poll a rebalance should be 
triggered
+            // NOTE we need to have a relatively much larger value than 
max.poll to let heartbeat expired for sure
+            TimeUnit.MILLISECONDS.sleep(3000);
+
+            awaitRebalance(consumer, listener);
+            assertEquals(2, listener.callsToAssigned);
+            assertEquals(1, listener.callsToRevoked);
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerMaxPollIntervalMsDelayInRevocation() throws 
InterruptedException {
+        testMaxPollIntervalMsDelayInRevocation(Map.of(
+            MAX_POLL_INTERVAL_MS_CONFIG, 5000,
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT),
+            HEARTBEAT_INTERVAL_MS_CONFIG, 500,
+            SESSION_TIMEOUT_MS_CONFIG, 1000,
+            ENABLE_AUTO_COMMIT_CONFIG, false
+        ));
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerMaxPollIntervalMsDelayInRevocation() throws 
InterruptedException {
+        testMaxPollIntervalMsDelayInRevocation(Map.of(
+            MAX_POLL_INTERVAL_MS_CONFIG, 5000,
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT),
+            ENABLE_AUTO_COMMIT_CONFIG, false
+        ));
+    }
+
+    private void testMaxPollIntervalMsDelayInRevocation(Map<String, Object> 
config) throws InterruptedException {
+        var commitCompleted = new AtomicBoolean(false);
+        var committedPosition = new AtomicLong(-1);
+
+        try (Consumer<byte[], byte[]> consumer = cluster.consumer(config)) {
+            var listener = new TestConsumerReassignmentListener() {
+                @Override
+                public void onPartitionsLost(Collection<TopicPartition> 
partitions) {
+                    // no op
+                }
+
+                @Override
+                public void onPartitionsRevoked(Collection<TopicPartition> 
partitions) {
+                    if (!partitions.isEmpty() && partitions.contains(tp)) {
+                        // on the second rebalance (after we have joined the 
group initially), sleep longer
+                        // than session timeout and then try a commit. We 
should still be in the group,
+                        // so the commit should succeed
+                        Utils.sleep(1500);
+                        committedPosition.set(consumer.position(tp));
+                        var offsets = Map.of(tp, new 
OffsetAndMetadata(committedPosition.get()));
+                        consumer.commitSync(offsets);
+                        commitCompleted.set(true);
+                    }
+                    super.onPartitionsRevoked(partitions);
+                }
+            };
+            consumer.subscribe(List.of(topic), listener);
+
+            // rebalance to get the initial assignment
+            awaitRebalance(consumer, listener);
+
+            // force a rebalance to trigger an invocation of the revocation 
callback while in the group
+            consumer.subscribe(List.of("otherTopic"), listener);
+            awaitRebalance(consumer, listener);
+
+            assertEquals(0, committedPosition.get());
+            assertTrue(commitCompleted.get());
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerMaxPollIntervalMsDelayInAssignment() throws 
InterruptedException {
+        testMaxPollIntervalMsDelayInAssignment(Map.of(
+            MAX_POLL_INTERVAL_MS_CONFIG, 5000,
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT),
+            HEARTBEAT_INTERVAL_MS_CONFIG, 500,
+            SESSION_TIMEOUT_MS_CONFIG, 1000,
+            ENABLE_AUTO_COMMIT_CONFIG, false
+        ));
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerMaxPollIntervalMsDelayInAssignment() throws 
InterruptedException {
+        testMaxPollIntervalMsDelayInAssignment(Map.of(
+            MAX_POLL_INTERVAL_MS_CONFIG, 5000,
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT),
+            ENABLE_AUTO_COMMIT_CONFIG, false
+        ));
+    }
+
+    private void testMaxPollIntervalMsDelayInAssignment(Map<String, Object> 
config) throws InterruptedException {
+        try (Consumer<byte[], byte[]> consumer = cluster.consumer(config)) {
+            TestConsumerReassignmentListener listener = new 
TestConsumerReassignmentListener() {
+                @Override
+                public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
+                    // sleep longer than the session timeout, we should still 
be in the group after invocation
+                    Utils.sleep(1500);
+                    super.onPartitionsAssigned(partitions);
+                }
+            };
+            consumer.subscribe(List.of(topic), listener);
+            // rebalance to get the initial assignment
+            awaitRebalance(consumer, listener);
+            // We should still be in the group after this invocation
+            ensureNoRebalance(consumer, listener);
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerMaxPollIntervalMsShorterThanPollTimeout() 
throws InterruptedException {
+        testMaxPollIntervalMsShorterThanPollTimeout(Map.of(
+            MAX_POLL_INTERVAL_MS_CONFIG, 1000,
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT),
+            HEARTBEAT_INTERVAL_MS_CONFIG, 500
+        ));
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerMaxPollIntervalMsShorterThanPollTimeout() 
throws InterruptedException {
+        testMaxPollIntervalMsShorterThanPollTimeout(Map.of(
+            MAX_POLL_INTERVAL_MS_CONFIG, 1000,
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)
+        ));
+    }
+
+    private void testMaxPollIntervalMsShorterThanPollTimeout(Map<String, 
Object> config) throws InterruptedException {
+        try (Consumer<byte[], byte[]> consumer = cluster.consumer(config)) {
+            var listener = new TestConsumerReassignmentListener();
+            consumer.subscribe(List.of(topic), listener);
+
+            // rebalance to get the initial assignment
+            awaitRebalance(consumer, listener);
+            var callsToAssignedAfterFirstRebalance = listener.callsToAssigned;
+            
+            consumer.poll(Duration.ofMillis(2000));
+            // If the poll above times out, it would trigger a rebalance.
+            // Leave some time for the rebalance to happen and check for the 
rebalance event.
+            consumer.poll(Duration.ofMillis(500));
+            consumer.poll(Duration.ofMillis(500));
+
+            assertEquals(callsToAssignedAfterFirstRebalance, 
listener.callsToAssigned);
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerPerPartitionLeadWithMaxPollRecords() throws 
InterruptedException {
+        testPerPartitionLeadWithMaxPollRecords(GroupProtocol.CLASSIC);
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerPerPartitionLeadWithMaxPollRecords() throws 
InterruptedException {
+        testPerPartitionLeadWithMaxPollRecords(GroupProtocol.CONSUMER);
+    }
+
+    private void testPerPartitionLeadWithMaxPollRecords(GroupProtocol 
groupProtocol) throws InterruptedException {
+        int numMessages = 1000;
+        int maxPollRecords = 10;
+        Map<String, Object> config = Map.of(
+            GROUP_PROTOCOL_CONFIG, 
groupProtocol.name().toLowerCase(Locale.ROOT),
+            GROUP_ID_CONFIG, "testPerPartitionLeadWithMaxPollRecords",
+            CLIENT_ID_CONFIG, "testPerPartitionLeadWithMaxPollRecords",
+            MAX_POLL_RECORDS_CONFIG, maxPollRecords
+        );
+
+        sendRecords(cluster, tp, numMessages);
+        try (Consumer<byte[], byte[]> consumer = cluster.consumer(config)) {
+            consumer.assign(List.of(tp));
+            awaitNonEmptyRecords(consumer, tp, 100);
+
+            var tags = Map.of(
+                "client-id", "testPerPartitionLeadWithMaxPollRecords",
+                "topic", tp.topic(),
+                "partition", String.valueOf(tp.partition())
+            );
+            var lead = consumer.metrics()
+                    .get(new MetricName("records-lead", 
"consumer-fetch-manager-metrics", "", tags));
+            assertEquals(maxPollRecords, (Double) lead.metricValue(), "The 
lead should be " + maxPollRecords);
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerPerPartitionLagWithMaxPollRecords() throws 
InterruptedException {
+        testPerPartitionLagWithMaxPollRecords(GroupProtocol.CLASSIC);
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerPerPartitionLagWithMaxPollRecords() throws 
InterruptedException {
+        testPerPartitionLagWithMaxPollRecords(GroupProtocol.CONSUMER);
+    }
+
+    private void testPerPartitionLagWithMaxPollRecords(GroupProtocol 
groupProtocol) throws InterruptedException {
+        int numMessages = 1000;
+        int maxPollRecords = 10;
+        Map<String, Object> config = Map.of(
+            GROUP_PROTOCOL_CONFIG, 
groupProtocol.name().toLowerCase(Locale.ROOT),
+            GROUP_ID_CONFIG, "testPerPartitionLagWithMaxPollRecords",
+            CLIENT_ID_CONFIG, "testPerPartitionLagWithMaxPollRecords",
+            MAX_POLL_RECORDS_CONFIG, maxPollRecords
+        );
+        sendRecords(cluster, tp, numMessages);
+        try (Consumer<byte[], byte[]> consumer = cluster.consumer(config)) {
+            consumer.assign(List.of(tp));
+            var records = awaitNonEmptyRecords(consumer, tp, 100);
+
+            var tags = Map.of(
+                "client-id", "testPerPartitionLagWithMaxPollRecords",
+                "topic", tp.topic(),
+                "partition", String.valueOf(tp.partition())
+            );
+            var lag = consumer.metrics()
+                    .get(new MetricName("records-lag", 
"consumer-fetch-manager-metrics", "", tags));
+
+            // Count the number of records received
+            var recordCount = records.count();
+            assertEquals(
+                numMessages - recordCount,
+                (Double) lag.metricValue(),
+                EPSILON,
+                "The lag should be " + (numMessages - recordCount)
+            );
+        }
+    }
+
+    @ClusterTest
+    public void runCloseClassicConsumerMultiConsumerSessionTimeoutTest() 
throws InterruptedException {
+        runMultiConsumerSessionTimeoutTest(GroupProtocol.CLASSIC, true);
+    }
+
+    @ClusterTest
+    public void runClassicConsumerMultiConsumerSessionTimeoutTest() throws 
InterruptedException {
+        runMultiConsumerSessionTimeoutTest(GroupProtocol.CLASSIC, false);
+    }
+
+    @ClusterTest
+    public void runCloseAsyncConsumerMultiConsumerSessionTimeoutTest() throws 
InterruptedException {
+        runMultiConsumerSessionTimeoutTest(GroupProtocol.CONSUMER, true);
+    }
+
+    @ClusterTest
+    public void runAsyncConsumerMultiConsumerSessionTimeoutTest() throws 
InterruptedException {
+        runMultiConsumerSessionTimeoutTest(GroupProtocol.CONSUMER, false);
+    }
+
+    private void runMultiConsumerSessionTimeoutTest(GroupProtocol 
groupProtocol, boolean closeConsumer) throws InterruptedException {
+        String topic1 = "topic1";
+        int partitions = 6;
+        Map<String, Object> config = Map.of(
+            GROUP_PROTOCOL_CONFIG, 
groupProtocol.name().toLowerCase(Locale.ROOT),
+            GROUP_ID_CONFIG, "test-group",
+            MAX_POLL_INTERVAL_MS_CONFIG, 100
+        );
+        // use consumers defined in this class plus one additional consumer
+        // Use topic defined in this class + one additional topic
+        try (Producer<byte[], byte[]> producer = cluster.producer();
+             // create one more consumer and add it to the group; we will time 
out this consumer
+             Consumer<byte[], byte[]> consumer1 = cluster.consumer(config);
+             Consumer<byte[], byte[]> consumer2 = cluster.consumer(config);
+             Consumer<byte[], byte[]> timeoutConsumer = 
cluster.consumer(config)
+        ) {
+            sendRecords(producer, tp, 100, System.currentTimeMillis(), -1);
+            sendRecords(producer, tp2, 100, System.currentTimeMillis(), -1);
+
+            Set<TopicPartition> subscriptions = new HashSet<>();
+            subscriptions.add(tp);
+            subscriptions.add(tp2);
+
+            cluster.createTopic(topic1, partitions, (short) BROKER_COUNT);
+            IntStream.range(0, partitions).forEach(partition -> {
+                TopicPartition topicPartition = new TopicPartition(topic1, 
partition);
+                sendRecords(producer, topicPartition, 100, 
System.currentTimeMillis(), -1);
+                subscriptions.add(topicPartition);
+            });
+
+            // first subscribe consumers that are defined in this class
+            List<ConsumerAssignmentPoller> consumerPollers = new ArrayList<>();

Review Comment:
   Could you please use `try-finally` to ensure it does not cause leak?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to