junrao commented on code in PR #18277:
URL: https://github.com/apache/kafka/pull/18277#discussion_r1937822716


##########
core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java:
##########
@@ -0,0 +1,450 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server.integration;
+import kafka.integration.KafkaServerTestHarness;
+import kafka.server.KafkaBroker;
+import kafka.server.KafkaConfig;
+import kafka.utils.Logging;
+import kafka.utils.TestUtils;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.FeatureUpdate;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.admin.UpdateFeaturesOptions;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
+import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+import scala.collection.mutable.HashMap;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class EligibleLeaderReplicasIntegrationTest extends 
KafkaServerTestHarness implements Logging {
+    private String bootstrapServer;
+    private String testTopicName;
+    private Admin adminClient;
+    @Override
+    public Seq<KafkaConfig> generateConfigs() {
+        List<Properties> brokerConfigs = new ArrayList<>();
+        
brokerConfigs.addAll(scala.collection.JavaConverters.seqAsJavaList(TestUtils.createBrokerConfigs(
+            5,
+            true,
+            true,
+            scala.Option.<SecurityProtocol>empty(),
+            scala.Option.<File>empty(),
+            scala.Option.<Properties>empty(),
+            true,
+            false,
+            false,
+            false,
+            new HashMap<>(),
+            1,
+            false,
+            1,
+            (short) 4,
+            0,
+            false
+        )));
+        List<KafkaConfig> configs = new ArrayList<>();
+        for (Properties props : brokerConfigs) {
+            configs.add(KafkaConfig.fromProps(props));
+        }
+        return JavaConverters.asScalaBuffer(configs).toSeq();
+    }
+
+    @BeforeEach
+    public void setUp(TestInfo info) {
+        super.setUp(info);
+        // create adminClient
+        Properties props = new Properties();
+        bootstrapServer = bootstrapServers(listenerName());
+        props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServer);
+        adminClient = Admin.create(props);
+        adminClient.updateFeatures(
+            Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME,
+                new 
FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), 
FeatureUpdate.UpgradeType.UPGRADE)),
+            new UpdateFeaturesOptions()
+        );
+        testTopicName = String.format("%s-%s", 
info.getTestMethod().get().getName(), "ELR-test");
+    }
+
+    @AfterEach
+    public void close() throws Exception {
+        if (adminClient != null) adminClient.close();
+        super.tearDown();
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = {"kraft"})
+    public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr(String quorum) 
throws ExecutionException, InterruptedException {
+        adminClient.createTopics(
+            Collections.singletonList(new NewTopic(testTopicName, 1, (short) 
4))).all().get();
+        TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000);
+
+        ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.TOPIC, testTopicName);
+        Collection<AlterConfigOp> ops = new ArrayList<>();
+        ops.add(new AlterConfigOp(new 
ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3"), 
AlterConfigOp.OpType.SET));
+        Map<ConfigResource, Collection<AlterConfigOp>> configOps = 
Collections.singletonMap(configResource, ops);
+        // alter configs on target cluster
+        adminClient.incrementalAlterConfigs(configOps).all().get();
+        Producer producer = null;
+        Consumer consumer = null;
+        try {
+            // check which partition is on broker 0 which we'll kill
+            TopicDescription testTopicDescription = 
adminClient.describeTopics(Collections.singletonList(testTopicName))
+                .allTopicNames().get().get(testTopicName);
+            TopicPartitionInfo topicPartitionInfo = 
testTopicDescription.partitions().get(0);
+            List<Node> initialReplicas = topicPartitionInfo.replicas();
+            assertEquals(4, topicPartitionInfo.isr().size());
+            assertEquals(0, topicPartitionInfo.elr().size());
+            assertEquals(0, topicPartitionInfo.lastKnownElr().size());
+
+            Properties producerProps = new Properties();
+            
producerProps.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
+            
producerProps.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
+            producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServer);
+            // Use Ack=1 for the producer.
+            producerProps.put(ProducerConfig.ACKS_CONFIG, "1");
+            producer = new KafkaProducer(producerProps);
+
+            Properties consumerProps = new Properties();
+            consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServer);
+            consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
+            consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "10");
+            consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+            
consumerProps.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
+            
consumerProps.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
+            consumer = new KafkaConsumer<>(consumerProps);
+            consumer.subscribe(Collections.singleton(testTopicName));
+
+            producer.send(new ProducerRecord<>(testTopicName, "0", "0")).get();
+            Thread.sleep(1000);
+            ConsumerRecords records = consumer.poll(Duration.ofSeconds(1L));
+            assertEquals(1, records.count());
+
+            killBroker(initialReplicas.get(0).id());
+            killBroker(initialReplicas.get(1).id());
+
+            waitForIsrAndElr((isrSize, elrSize) -> {
+                return isrSize == 2 && elrSize == 1;
+            });
+
+            // Now the partition is under min ISR. HWM should not advance.
+            producer.send(new ProducerRecord<>(testTopicName, "1", "1")).get();
+            Thread.sleep(1000);

Review Comment:
   1 sec is quite long. How about 100ms?



##########
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##########
@@ -335,7 +335,8 @@ Map<Integer, BrokerRegistration> brokerRegistrations() {
     public ControllerResult<BrokerRegistrationReply> registerBroker(
         BrokerRegistrationRequestData request,
         long newBrokerEpoch,
-        FinalizedControllerFeatures finalizedFeatures
+        FinalizedControllerFeatures finalizedFeatures,
+        boolean uncleanShutdownDetectionEnabled

Review Comment:
   uncleanShutdownDetectionEnabled => cleanShutdownDetectionEnabled ?



##########
metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java:
##########
@@ -855,13 +866,65 @@ public void testReRegistrationAndBrokerEpoch(boolean 
newIncarnationId) {
         }
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void testReRegistrationWithUncleanShutdownDetection(boolean 
isCleanShutdown) {

Review Comment:
   testReRegistrationWithUncleanShutdownDetection => 
testReRegistrationWithCleanShutdownDetection ?



##########
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java:
##########
@@ -460,39 +460,40 @@ public void testUncleanShutdownBrokerElrEnabled() throws 
Throwable {
             int brokerToUncleanShutdown, brokerToBeTheLeader;
 
             // lastKnownElr stores the last known leader.
+            brokerToUncleanShutdown = lastKnownElr[0];
             if (lastKnownElr[0] == partition.elr[0]) {
-                brokerToUncleanShutdown = partition.elr[0];
                 brokerToBeTheLeader = partition.elr[1];
             } else {
-                brokerToUncleanShutdown = partition.elr[1];
                 brokerToBeTheLeader = partition.elr[0];
             }
 
-            // Unclean shutdown should remove the ELR members.
-            active.registerBroker(
+            // Unclean shutdown should remove brokerToUncleanShutdown from the 
ELR members, but it should still be in
+            // the lastKnownElr.
+            CompletableFuture<BrokerRegistrationReply> reply = 
active.registerBroker(
                 anonymousContextFor(ApiKeys.BROKER_REGISTRATION),
                 new BrokerRegistrationRequestData().
                     setBrokerId(brokerToUncleanShutdown).
                     setClusterId(active.clusterId()).
                     setFeatures(features).
                     setIncarnationId(Uuid.randomUuid()).
                     setLogDirs(Collections.singletonList(Uuid.randomUuid())).
-                    setListeners(listeners)).get();
+                    setListeners(listeners));
+            brokerEpochs.put(brokerToUncleanShutdown, reply.get().epoch());
             partition = active.replicationControl().getPartition(topicIdFoo, 
0);
             assertArrayEquals(new int[]{brokerToBeTheLeader}, partition.elr, 
partition.toString());
+            assertArrayEquals(lastKnownElr, partition.lastKnownElr, 
partition.toString());
 
             // Unclean shutdown should not remove the last known ELR members.
-            active.registerBroker(
+            reply = active.registerBroker(

Review Comment:
   reply is unused.



##########
metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java:
##########
@@ -855,13 +866,65 @@ public void testReRegistrationAndBrokerEpoch(boolean 
newIncarnationId) {
         }
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void testReRegistrationWithUncleanShutdownDetection(boolean 
isCleanShutdown) {
+        ClusterControlManager clusterControl = new 
ClusterControlManager.Builder().
+            setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
+            setFeatureControlManager(new 
FeatureControlManager.Builder().build()).
+            setBrokerShutdownHandler((brokerId, cleanShutdown, records) -> {
+                if (!cleanShutdown) {
+                    records.add(new ApiMessageAndVersion(new 
PartitionChangeRecord(), PartitionChangeRecord.HIGHEST_SUPPORTED_VERSION));
+                }
+            }).
+            build();
+        clusterControl.activate();
+        List<ApiMessageAndVersion> records = clusterControl.registerBroker(
+            new BrokerRegistrationRequestData().
+                setBrokerId(1).
+                setClusterId(clusterControl.clusterId()).
+                setIncarnationId(Uuid.fromString("mISEfEFwQIuaD1gKCc5tzQ")).
+                
setLogDirs(Arrays.asList(Uuid.fromString("Vv1gzkM2QpuE-PPrIc6XEw"))),
+            100,
+            new FinalizedControllerFeatures(Collections.emptyMap(), 100L),
+            true).
+                records();
+        records.add(new ApiMessageAndVersion(new 
BrokerRegistrationChangeRecord().
+            setBrokerId(1).setBrokerEpoch(100).
+            
setInControlledShutdown(BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value()),
+            (short) 1));
+        RecordTestUtils.replayAll(clusterControl, records);
+
+        records = clusterControl.registerBroker(
+            new BrokerRegistrationRequestData().
+                setBrokerId(1).
+                setClusterId(clusterControl.clusterId()).
+                setIncarnationId(Uuid.fromString("07OOcU7MQFeSmGAFPP2Zww")).
+                setPreviousBrokerEpoch(isCleanShutdown ? 100 : 10).
+                
setLogDirs(Arrays.asList(Uuid.fromString("Vv1gzkM2QpuE-PPrIc6XEw"))),
+            111,
+            new FinalizedControllerFeatures(Collections.emptyMap(), 100L),
+            true).records();
+        RecordTestUtils.replayAll(clusterControl, records);
+        assertEquals(Uuid.fromString("07OOcU7MQFeSmGAFPP2Zww"),
+            clusterControl.brokerRegistrations().get(1).incarnationId());
+        
assertFalse(clusterControl.brokerRegistrations().get(1).inControlledShutdown());
+        if (isCleanShutdown) {
+            assertEquals(100, 
clusterControl.brokerRegistrations().get(1).epoch());

Review Comment:
   This seems incorrect. Independent of whether the shutdown was clean or not, 
the new broker epoch should always be reflected after broker registration.



##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -1461,20 +1461,21 @@ void handleBrokerInControlledShutdown(int brokerId, 
long brokerEpoch, List<ApiMe
     }
 
     /**
-     * Create partition change records to remove replicas from any ISR or ELR 
for brokers doing unclean shutdown.
+     * Create partition change records to remove replicas from any ISR or ELR 
for brokers when the shutdown is detected.
      *
-     * @param brokerId      The broker id.
-     * @param records       The record list to append to.
+     * @param brokerId           The broker id.

Review Comment:
   The broker id => The broker id to be shut down



##########
core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java:
##########
@@ -0,0 +1,450 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server.integration;
+import kafka.integration.KafkaServerTestHarness;
+import kafka.server.KafkaBroker;
+import kafka.server.KafkaConfig;
+import kafka.utils.Logging;
+import kafka.utils.TestUtils;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.FeatureUpdate;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.admin.UpdateFeaturesOptions;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
+import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+import scala.collection.mutable.HashMap;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class EligibleLeaderReplicasIntegrationTest extends 
KafkaServerTestHarness implements Logging {
+    private String bootstrapServer;
+    private String testTopicName;
+    private Admin adminClient;
+    @Override
+    public Seq<KafkaConfig> generateConfigs() {
+        List<Properties> brokerConfigs = new ArrayList<>();
+        
brokerConfigs.addAll(scala.collection.JavaConverters.seqAsJavaList(TestUtils.createBrokerConfigs(
+            5,
+            true,
+            true,
+            scala.Option.<SecurityProtocol>empty(),
+            scala.Option.<File>empty(),
+            scala.Option.<Properties>empty(),
+            true,
+            false,
+            false,
+            false,
+            new HashMap<>(),
+            1,
+            false,
+            1,
+            (short) 4,
+            0,
+            false
+        )));
+        List<KafkaConfig> configs = new ArrayList<>();
+        for (Properties props : brokerConfigs) {
+            configs.add(KafkaConfig.fromProps(props));
+        }
+        return JavaConverters.asScalaBuffer(configs).toSeq();
+    }
+
+    @BeforeEach
+    public void setUp(TestInfo info) {
+        super.setUp(info);
+        // create adminClient
+        Properties props = new Properties();
+        bootstrapServer = bootstrapServers(listenerName());
+        props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServer);
+        adminClient = Admin.create(props);
+        adminClient.updateFeatures(
+            Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME,
+                new 
FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), 
FeatureUpdate.UpgradeType.UPGRADE)),
+            new UpdateFeaturesOptions()
+        );
+        testTopicName = String.format("%s-%s", 
info.getTestMethod().get().getName(), "ELR-test");
+    }
+
+    @AfterEach
+    public void close() throws Exception {
+        if (adminClient != null) adminClient.close();
+        super.tearDown();
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = {"kraft"})
+    public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr(String quorum) 
throws ExecutionException, InterruptedException {
+        adminClient.createTopics(
+            Collections.singletonList(new NewTopic(testTopicName, 1, (short) 
4))).all().get();
+        TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000);
+
+        ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.TOPIC, testTopicName);
+        Collection<AlterConfigOp> ops = new ArrayList<>();
+        ops.add(new AlterConfigOp(new 
ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3"), 
AlterConfigOp.OpType.SET));
+        Map<ConfigResource, Collection<AlterConfigOp>> configOps = 
Collections.singletonMap(configResource, ops);
+        // alter configs on target cluster
+        adminClient.incrementalAlterConfigs(configOps).all().get();
+        Producer producer = null;
+        Consumer consumer = null;
+        try {
+            // check which partition is on broker 0 which we'll kill

Review Comment:
   There is only 1 partition. Ditto below.



##########
core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java:
##########
@@ -0,0 +1,450 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server.integration;
+import kafka.integration.KafkaServerTestHarness;
+import kafka.server.KafkaBroker;
+import kafka.server.KafkaConfig;
+import kafka.utils.Logging;
+import kafka.utils.TestUtils;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.FeatureUpdate;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.admin.UpdateFeaturesOptions;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
+import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+import scala.collection.mutable.HashMap;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class EligibleLeaderReplicasIntegrationTest extends 
KafkaServerTestHarness implements Logging {
+    private String bootstrapServer;
+    private String testTopicName;
+    private Admin adminClient;
+    @Override
+    public Seq<KafkaConfig> generateConfigs() {
+        List<Properties> brokerConfigs = new ArrayList<>();
+        
brokerConfigs.addAll(scala.collection.JavaConverters.seqAsJavaList(TestUtils.createBrokerConfigs(
+            5,
+            true,
+            true,
+            scala.Option.<SecurityProtocol>empty(),
+            scala.Option.<File>empty(),
+            scala.Option.<Properties>empty(),
+            true,
+            false,
+            false,
+            false,
+            new HashMap<>(),
+            1,
+            false,
+            1,
+            (short) 4,
+            0,
+            false
+        )));
+        List<KafkaConfig> configs = new ArrayList<>();
+        for (Properties props : brokerConfigs) {
+            configs.add(KafkaConfig.fromProps(props));
+        }
+        return JavaConverters.asScalaBuffer(configs).toSeq();
+    }
+
+    @BeforeEach
+    public void setUp(TestInfo info) {
+        super.setUp(info);
+        // create adminClient
+        Properties props = new Properties();
+        bootstrapServer = bootstrapServers(listenerName());
+        props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServer);
+        adminClient = Admin.create(props);
+        adminClient.updateFeatures(
+            Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME,
+                new 
FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), 
FeatureUpdate.UpgradeType.UPGRADE)),
+            new UpdateFeaturesOptions()
+        );
+        testTopicName = String.format("%s-%s", 
info.getTestMethod().get().getName(), "ELR-test");
+    }
+
+    @AfterEach
+    public void close() throws Exception {
+        if (adminClient != null) adminClient.close();
+        super.tearDown();
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = {"kraft"})
+    public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr(String quorum) 
throws ExecutionException, InterruptedException {
+        adminClient.createTopics(
+            Collections.singletonList(new NewTopic(testTopicName, 1, (short) 
4))).all().get();
+        TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000);
+
+        ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.TOPIC, testTopicName);
+        Collection<AlterConfigOp> ops = new ArrayList<>();
+        ops.add(new AlterConfigOp(new 
ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3"), 
AlterConfigOp.OpType.SET));
+        Map<ConfigResource, Collection<AlterConfigOp>> configOps = 
Collections.singletonMap(configResource, ops);
+        // alter configs on target cluster
+        adminClient.incrementalAlterConfigs(configOps).all().get();
+        Producer producer = null;
+        Consumer consumer = null;
+        try {
+            // check which partition is on broker 0 which we'll kill
+            TopicDescription testTopicDescription = 
adminClient.describeTopics(Collections.singletonList(testTopicName))
+                .allTopicNames().get().get(testTopicName);
+            TopicPartitionInfo topicPartitionInfo = 
testTopicDescription.partitions().get(0);
+            List<Node> initialReplicas = topicPartitionInfo.replicas();
+            assertEquals(4, topicPartitionInfo.isr().size());
+            assertEquals(0, topicPartitionInfo.elr().size());
+            assertEquals(0, topicPartitionInfo.lastKnownElr().size());
+
+            Properties producerProps = new Properties();
+            
producerProps.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
+            
producerProps.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
+            producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServer);
+            // Use Ack=1 for the producer.
+            producerProps.put(ProducerConfig.ACKS_CONFIG, "1");
+            producer = new KafkaProducer(producerProps);
+
+            Properties consumerProps = new Properties();
+            consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServer);
+            consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
+            consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "10");
+            consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+            
consumerProps.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
+            
consumerProps.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
+            consumer = new KafkaConsumer<>(consumerProps);
+            consumer.subscribe(Collections.singleton(testTopicName));
+
+            producer.send(new ProducerRecord<>(testTopicName, "0", "0")).get();
+            Thread.sleep(1000);

Review Comment:
   Hmm, why do we need to sleep here? Could we use `waitUntil` ?



##########
core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java:
##########
@@ -0,0 +1,450 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server.integration;
+import kafka.integration.KafkaServerTestHarness;
+import kafka.server.KafkaBroker;
+import kafka.server.KafkaConfig;
+import kafka.utils.Logging;
+import kafka.utils.TestUtils;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.FeatureUpdate;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.admin.UpdateFeaturesOptions;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
+import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+import scala.collection.mutable.HashMap;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class EligibleLeaderReplicasIntegrationTest extends 
KafkaServerTestHarness implements Logging {
+    private String bootstrapServer;
+    private String testTopicName;
+    private Admin adminClient;
+    @Override
+    public Seq<KafkaConfig> generateConfigs() {
+        List<Properties> brokerConfigs = new ArrayList<>();
+        
brokerConfigs.addAll(scala.collection.JavaConverters.seqAsJavaList(TestUtils.createBrokerConfigs(
+            5,
+            true,
+            true,
+            scala.Option.<SecurityProtocol>empty(),
+            scala.Option.<File>empty(),
+            scala.Option.<Properties>empty(),
+            true,
+            false,
+            false,
+            false,
+            new HashMap<>(),
+            1,
+            false,
+            1,
+            (short) 4,
+            0,
+            false
+        )));
+        List<KafkaConfig> configs = new ArrayList<>();
+        for (Properties props : brokerConfigs) {
+            configs.add(KafkaConfig.fromProps(props));
+        }
+        return JavaConverters.asScalaBuffer(configs).toSeq();
+    }
+
+    @BeforeEach
+    public void setUp(TestInfo info) {
+        super.setUp(info);
+        // create adminClient
+        Properties props = new Properties();
+        bootstrapServer = bootstrapServers(listenerName());
+        props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServer);
+        adminClient = Admin.create(props);
+        adminClient.updateFeatures(
+            Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME,
+                new 
FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), 
FeatureUpdate.UpgradeType.UPGRADE)),
+            new UpdateFeaturesOptions()
+        );
+        testTopicName = String.format("%s-%s", 
info.getTestMethod().get().getName(), "ELR-test");
+    }
+
+    @AfterEach
+    public void close() throws Exception {
+        if (adminClient != null) adminClient.close();
+        super.tearDown();
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = {"kraft"})
+    public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr(String quorum) 
throws ExecutionException, InterruptedException {
+        adminClient.createTopics(
+            Collections.singletonList(new NewTopic(testTopicName, 1, (short) 
4))).all().get();
+        TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000);
+
+        ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.TOPIC, testTopicName);
+        Collection<AlterConfigOp> ops = new ArrayList<>();
+        ops.add(new AlterConfigOp(new 
ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3"), 
AlterConfigOp.OpType.SET));
+        Map<ConfigResource, Collection<AlterConfigOp>> configOps = 
Collections.singletonMap(configResource, ops);
+        // alter configs on target cluster
+        adminClient.incrementalAlterConfigs(configOps).all().get();
+        Producer producer = null;
+        Consumer consumer = null;
+        try {
+            // check which partition is on broker 0 which we'll kill
+            TopicDescription testTopicDescription = 
adminClient.describeTopics(Collections.singletonList(testTopicName))
+                .allTopicNames().get().get(testTopicName);
+            TopicPartitionInfo topicPartitionInfo = 
testTopicDescription.partitions().get(0);
+            List<Node> initialReplicas = topicPartitionInfo.replicas();
+            assertEquals(4, topicPartitionInfo.isr().size());
+            assertEquals(0, topicPartitionInfo.elr().size());
+            assertEquals(0, topicPartitionInfo.lastKnownElr().size());
+
+            Properties producerProps = new Properties();
+            
producerProps.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
+            
producerProps.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
+            producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServer);
+            // Use Ack=1 for the producer.
+            producerProps.put(ProducerConfig.ACKS_CONFIG, "1");
+            producer = new KafkaProducer(producerProps);
+
+            Properties consumerProps = new Properties();
+            consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServer);
+            consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
+            consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "10");
+            consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+            
consumerProps.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
+            
consumerProps.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
+            consumer = new KafkaConsumer<>(consumerProps);
+            consumer.subscribe(Collections.singleton(testTopicName));
+
+            producer.send(new ProducerRecord<>(testTopicName, "0", "0")).get();
+            Thread.sleep(1000);
+            ConsumerRecords records = consumer.poll(Duration.ofSeconds(1L));
+            assertEquals(1, records.count());
+
+            killBroker(initialReplicas.get(0).id());
+            killBroker(initialReplicas.get(1).id());
+
+            waitForIsrAndElr((isrSize, elrSize) -> {
+                return isrSize == 2 && elrSize == 1;
+            });
+
+            // Now the partition is under min ISR. HWM should not advance.
+            producer.send(new ProducerRecord<>(testTopicName, "1", "1")).get();
+            Thread.sleep(1000);
+            records = consumer.poll(Duration.ofSeconds(1L));
+            assertEquals(0, records.count());
+
+            // Restore the min ISR and the previous log should be visible.
+            startBroker(initialReplicas.get(1).id());
+            startBroker(initialReplicas.get(0).id());
+            waitForIsrAndElr((isrSize, elrSize) -> {
+                return isrSize == 4 && elrSize == 0;
+            });
+
+            Consumer finalConsumer = consumer;
+            kafka.utils.TestUtils.waitUntilTrue(
+                () -> {
+                    try {
+                        ConsumerRecords record = 
finalConsumer.poll(Duration.ofMillis(100L));
+                        return record.count() == 1;
+                    } catch (Exception e) {
+                        return false;
+                    }
+                },
+                () -> "fail to consume messages",
+                org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L
+            );
+        } finally {
+            restartDeadBrokers(false);
+            if (consumer != null) consumer.close();
+            if (producer != null) producer.close();
+        }
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = {"kraft"})
+    public void testElrMemberCanBeElected(String quorum) throws 
ExecutionException, InterruptedException {
+        adminClient.createTopics(
+            Collections.singletonList(new NewTopic(testTopicName, 1, (short) 
4))).all().get();
+        TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000);
+
+        ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.TOPIC, testTopicName);
+        Collection<AlterConfigOp> ops = new ArrayList<>();
+        ops.add(new AlterConfigOp(new 
ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3"), 
AlterConfigOp.OpType.SET));
+        Map<ConfigResource, Collection<AlterConfigOp>> configOps = 
Collections.singletonMap(configResource, ops);
+        // alter configs on target cluster
+        adminClient.incrementalAlterConfigs(configOps).all().get();
+
+        try {
+            // check which partition is on broker 0 which we'll kill
+            TopicDescription testTopicDescription = 
adminClient.describeTopics(Collections.singletonList(testTopicName))
+                .allTopicNames().get().get(testTopicName);
+            TopicPartitionInfo topicPartitionInfo = 
testTopicDescription.partitions().get(0);
+            List<Node> initialReplicas = topicPartitionInfo.replicas();
+            assertEquals(4, topicPartitionInfo.isr().size());
+            assertEquals(0, topicPartitionInfo.elr().size());
+            assertEquals(0, topicPartitionInfo.lastKnownElr().size());
+
+            killBroker(initialReplicas.get(0).id());
+            killBroker(initialReplicas.get(1).id());
+            killBroker(initialReplicas.get(2).id());
+
+            waitForIsrAndElr((isrSize, elrSize) -> {
+                return isrSize == 1 && elrSize == 2;
+            });
+
+            topicPartitionInfo = 
adminClient.describeTopics(Collections.singletonList(testTopicName))
+                .allTopicNames().get().get(testTopicName).partitions().get(0);
+            assertEquals(2, topicPartitionInfo.elr().size());

Review Comment:
   This seems unnecessary since it's covered by the `waitForIsrAndElr` call 
earlier?



##########
core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java:
##########
@@ -0,0 +1,450 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server.integration;
+import kafka.integration.KafkaServerTestHarness;
+import kafka.server.KafkaBroker;
+import kafka.server.KafkaConfig;
+import kafka.utils.Logging;
+import kafka.utils.TestUtils;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.FeatureUpdate;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.admin.UpdateFeaturesOptions;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
+import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+import scala.collection.mutable.HashMap;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class EligibleLeaderReplicasIntegrationTest extends 
KafkaServerTestHarness implements Logging {
+    private String bootstrapServer;
+    private String testTopicName;
+    private Admin adminClient;
+    @Override
+    public Seq<KafkaConfig> generateConfigs() {
+        List<Properties> brokerConfigs = new ArrayList<>();
+        
brokerConfigs.addAll(scala.collection.JavaConverters.seqAsJavaList(TestUtils.createBrokerConfigs(
+            5,
+            true,
+            true,
+            scala.Option.<SecurityProtocol>empty(),
+            scala.Option.<File>empty(),
+            scala.Option.<Properties>empty(),
+            true,
+            false,
+            false,
+            false,
+            new HashMap<>(),
+            1,
+            false,
+            1,
+            (short) 4,
+            0,
+            false
+        )));
+        List<KafkaConfig> configs = new ArrayList<>();
+        for (Properties props : brokerConfigs) {
+            configs.add(KafkaConfig.fromProps(props));
+        }
+        return JavaConverters.asScalaBuffer(configs).toSeq();
+    }
+
+    @BeforeEach
+    public void setUp(TestInfo info) {
+        super.setUp(info);
+        // create adminClient
+        Properties props = new Properties();
+        bootstrapServer = bootstrapServers(listenerName());
+        props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServer);
+        adminClient = Admin.create(props);
+        adminClient.updateFeatures(
+            Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME,
+                new 
FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), 
FeatureUpdate.UpgradeType.UPGRADE)),
+            new UpdateFeaturesOptions()
+        );
+        testTopicName = String.format("%s-%s", 
info.getTestMethod().get().getName(), "ELR-test");
+    }
+
+    @AfterEach
+    public void close() throws Exception {
+        if (adminClient != null) adminClient.close();
+        super.tearDown();
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = {"kraft"})
+    public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr(String quorum) 
throws ExecutionException, InterruptedException {
+        adminClient.createTopics(
+            Collections.singletonList(new NewTopic(testTopicName, 1, (short) 
4))).all().get();
+        TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000);
+
+        ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.TOPIC, testTopicName);
+        Collection<AlterConfigOp> ops = new ArrayList<>();
+        ops.add(new AlterConfigOp(new 
ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3"), 
AlterConfigOp.OpType.SET));
+        Map<ConfigResource, Collection<AlterConfigOp>> configOps = 
Collections.singletonMap(configResource, ops);
+        // alter configs on target cluster
+        adminClient.incrementalAlterConfigs(configOps).all().get();
+        Producer producer = null;
+        Consumer consumer = null;
+        try {
+            // check which partition is on broker 0 which we'll kill
+            TopicDescription testTopicDescription = 
adminClient.describeTopics(Collections.singletonList(testTopicName))
+                .allTopicNames().get().get(testTopicName);
+            TopicPartitionInfo topicPartitionInfo = 
testTopicDescription.partitions().get(0);
+            List<Node> initialReplicas = topicPartitionInfo.replicas();
+            assertEquals(4, topicPartitionInfo.isr().size());
+            assertEquals(0, topicPartitionInfo.elr().size());
+            assertEquals(0, topicPartitionInfo.lastKnownElr().size());
+
+            Properties producerProps = new Properties();
+            
producerProps.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
+            
producerProps.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
+            producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServer);
+            // Use Ack=1 for the producer.
+            producerProps.put(ProducerConfig.ACKS_CONFIG, "1");
+            producer = new KafkaProducer(producerProps);
+
+            Properties consumerProps = new Properties();
+            consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServer);
+            consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
+            consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "10");
+            consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+            
consumerProps.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
+            
consumerProps.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
+            consumer = new KafkaConsumer<>(consumerProps);
+            consumer.subscribe(Collections.singleton(testTopicName));
+
+            producer.send(new ProducerRecord<>(testTopicName, "0", "0")).get();
+            Thread.sleep(1000);
+            ConsumerRecords records = consumer.poll(Duration.ofSeconds(1L));
+            assertEquals(1, records.count());
+
+            killBroker(initialReplicas.get(0).id());
+            killBroker(initialReplicas.get(1).id());
+
+            waitForIsrAndElr((isrSize, elrSize) -> {
+                return isrSize == 2 && elrSize == 1;
+            });
+
+            // Now the partition is under min ISR. HWM should not advance.
+            producer.send(new ProducerRecord<>(testTopicName, "1", "1")).get();
+            Thread.sleep(1000);
+            records = consumer.poll(Duration.ofSeconds(1L));
+            assertEquals(0, records.count());
+
+            // Restore the min ISR and the previous log should be visible.
+            startBroker(initialReplicas.get(1).id());
+            startBroker(initialReplicas.get(0).id());
+            waitForIsrAndElr((isrSize, elrSize) -> {
+                return isrSize == 4 && elrSize == 0;
+            });
+
+            Consumer finalConsumer = consumer;
+            kafka.utils.TestUtils.waitUntilTrue(
+                () -> {
+                    try {
+                        ConsumerRecords record = 
finalConsumer.poll(Duration.ofMillis(100L));
+                        return record.count() == 1;
+                    } catch (Exception e) {
+                        return false;
+                    }
+                },
+                () -> "fail to consume messages",
+                org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L
+            );
+        } finally {
+            restartDeadBrokers(false);
+            if (consumer != null) consumer.close();
+            if (producer != null) producer.close();
+        }
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = {"kraft"})
+    public void testElrMemberCanBeElected(String quorum) throws 
ExecutionException, InterruptedException {
+        adminClient.createTopics(
+            Collections.singletonList(new NewTopic(testTopicName, 1, (short) 
4))).all().get();
+        TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000);
+
+        ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.TOPIC, testTopicName);
+        Collection<AlterConfigOp> ops = new ArrayList<>();
+        ops.add(new AlterConfigOp(new 
ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3"), 
AlterConfigOp.OpType.SET));
+        Map<ConfigResource, Collection<AlterConfigOp>> configOps = 
Collections.singletonMap(configResource, ops);
+        // alter configs on target cluster
+        adminClient.incrementalAlterConfigs(configOps).all().get();
+
+        try {
+            // check which partition is on broker 0 which we'll kill
+            TopicDescription testTopicDescription = 
adminClient.describeTopics(Collections.singletonList(testTopicName))
+                .allTopicNames().get().get(testTopicName);
+            TopicPartitionInfo topicPartitionInfo = 
testTopicDescription.partitions().get(0);
+            List<Node> initialReplicas = topicPartitionInfo.replicas();
+            assertEquals(4, topicPartitionInfo.isr().size());
+            assertEquals(0, topicPartitionInfo.elr().size());
+            assertEquals(0, topicPartitionInfo.lastKnownElr().size());
+
+            killBroker(initialReplicas.get(0).id());
+            killBroker(initialReplicas.get(1).id());
+            killBroker(initialReplicas.get(2).id());
+
+            waitForIsrAndElr((isrSize, elrSize) -> {
+                return isrSize == 1 && elrSize == 2;
+            });
+
+            topicPartitionInfo = 
adminClient.describeTopics(Collections.singletonList(testTopicName))
+                .allTopicNames().get().get(testTopicName).partitions().get(0);
+            assertEquals(2, topicPartitionInfo.elr().size());
+
+            killBroker(initialReplicas.get(3).id());
+
+            waitForIsrAndElr((isrSize, elrSize) -> {
+                return isrSize == 0 && elrSize == 3;
+            });
+
+            topicPartitionInfo = 
adminClient.describeTopics(Collections.singletonList(testTopicName))
+                .allTopicNames().get().get(testTopicName).partitions().get(0);
+            assertEquals(1, topicPartitionInfo.lastKnownElr().size(), 
topicPartitionInfo.toString());
+            int expectLastKnownLeader = initialReplicas.get(3).id();
+            assertEquals(expectLastKnownLeader, 
topicPartitionInfo.lastKnownElr().get(0).id(), topicPartitionInfo.toString());
+
+            // At this point, all the replicas are failed and the last know 
leader is No.3 and 3 members in the ELR.
+            // Restart one broker of the ELR and it should be the leader.
+
+            int expectLeader = topicPartitionInfo.elr().stream()
+                .filter(node -> node.id() != 
expectLastKnownLeader).collect(Collectors.toList()).get(0).id();
+
+            startBroker(expectLeader);
+            waitForIsrAndElr((isrSize, elrSize) -> {
+                return isrSize == 1 && elrSize == 2;
+            });
+
+            topicPartitionInfo = 
adminClient.describeTopics(Collections.singletonList(testTopicName))
+                .allTopicNames().get().get(testTopicName).partitions().get(0);
+            assertEquals(0, topicPartitionInfo.lastKnownElr().size(), 
topicPartitionInfo.toString());
+            assertEquals(expectLeader, topicPartitionInfo.leader().id(), 
topicPartitionInfo.toString());
+
+            // Start another 2 brokers and the ELR fields should be cleaned.
+            topicPartitionInfo.replicas().stream().filter(node -> node.id() != 
expectLeader).limit(2)
+                .forEach(node -> startBroker(node.id()));
+
+            waitForIsrAndElr((isrSize, elrSize) -> {
+                return isrSize == 3 && elrSize == 0;
+            });
+
+            topicPartitionInfo = 
adminClient.describeTopics(Collections.singletonList(testTopicName))
+                .allTopicNames().get().get(testTopicName).partitions().get(0);
+            assertEquals(0, topicPartitionInfo.lastKnownElr().size(), 
topicPartitionInfo.toString());
+            assertEquals(expectLeader, topicPartitionInfo.leader().id(), 
topicPartitionInfo.toString());
+        } finally {
+            restartDeadBrokers(false);
+        }
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = {"kraft"})
+    public void testElrMemberShouldBeKickOutWhenUncleanShutdown(String quorum) 
throws ExecutionException, InterruptedException {
+        adminClient.createTopics(
+            Collections.singletonList(new NewTopic(testTopicName, 1, (short) 
4))).all().get();
+        TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000);
+
+        ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.TOPIC, testTopicName);
+        Collection<AlterConfigOp> ops = new ArrayList<>();
+        ops.add(new AlterConfigOp(new 
ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3"), 
AlterConfigOp.OpType.SET));
+        Map<ConfigResource, Collection<AlterConfigOp>> configOps = 
Collections.singletonMap(configResource, ops);
+        // alter configs on target cluster
+        adminClient.incrementalAlterConfigs(configOps).all().get();
+
+        try {
+            // check which partition is on broker 0 which we'll kill
+            TopicDescription testTopicDescription = 
adminClient.describeTopics(Collections.singletonList(testTopicName))
+                .allTopicNames().get().get(testTopicName);
+            TopicPartitionInfo topicPartitionInfo = 
testTopicDescription.partitions().get(0);
+            List<Node> initialReplicas = topicPartitionInfo.replicas();
+            assertEquals(4, topicPartitionInfo.isr().size());
+            assertEquals(0, topicPartitionInfo.elr().size());
+            assertEquals(0, topicPartitionInfo.lastKnownElr().size());
+
+            killBroker(initialReplicas.get(0).id());
+            killBroker(initialReplicas.get(1).id());
+            killBroker(initialReplicas.get(2).id());
+            killBroker(initialReplicas.get(3).id());
+
+            waitForIsrAndElr((isrSize, elrSize) -> {
+                return isrSize == 0 && elrSize == 3;
+            });
+            topicPartitionInfo = 
adminClient.describeTopics(Collections.singletonList(testTopicName))
+                .allTopicNames().get().get(testTopicName).partitions().get(0);
+
+            int brokerToBeUncleanShutdown = 
topicPartitionInfo.elr().get(0).id();
+            KafkaBroker broker = brokers().find(b -> {
+                return b.config().brokerId() == brokerToBeUncleanShutdown;
+            }).get();
+            Seq<File> dirs = broker.logManager().liveLogDirs();
+            assertEquals(1, dirs.size());
+            CleanShutdownFileHandler handler = new 
CleanShutdownFileHandler(dirs.apply(0).toString());
+            assertTrue(handler.exists());
+            assertDoesNotThrow(() -> handler.delete());
+
+            // After remove the clean shutdown file, the broker should report 
unclean shutdown during restart.
+            startBroker(brokerToBeUncleanShutdown);
+            waitForIsrAndElr((isrSize, elrSize) -> {
+                return isrSize == 0 && elrSize == 2;
+            });
+            topicPartitionInfo = 
adminClient.describeTopics(Collections.singletonList(testTopicName))
+                .allTopicNames().get().get(testTopicName).partitions().get(0);
+            assertTrue(topicPartitionInfo.leader() == null);
+            assertEquals(1, topicPartitionInfo.lastKnownElr().size());
+        } finally {
+            restartDeadBrokers(false);
+        }
+    }
+
+    /*
+        This test is only valid for KIP-966 part 1. When the unclean recovery 
is implemented, it should be removed.
+     */
+    @ParameterizedTest
+    @ValueSource(strings = {"kraft"})
+    public void testLastKnownLeaderShouldBeElectedIfEmptyElr(String quorum) 
throws ExecutionException, InterruptedException {
+        adminClient.createTopics(
+            Collections.singletonList(new NewTopic(testTopicName, 1, (short) 
4))).all().get();
+        TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000);
+
+        ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.TOPIC, testTopicName);
+        Collection<AlterConfigOp> ops = new ArrayList<>();
+        ops.add(new AlterConfigOp(new 
ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3"), 
AlterConfigOp.OpType.SET));
+        Map<ConfigResource, Collection<AlterConfigOp>> configOps = 
Collections.singletonMap(configResource, ops);
+        // alter configs on target cluster
+        adminClient.incrementalAlterConfigs(configOps).all().get();
+
+        try {
+            // check which partition is on broker 0 which we'll kill
+            TopicDescription testTopicDescription = 
adminClient.describeTopics(Collections.singletonList(testTopicName))
+                .allTopicNames().get().get(testTopicName);
+            TopicPartitionInfo topicPartitionInfo = 
testTopicDescription.partitions().get(0);
+            List<Node> initialReplicas = topicPartitionInfo.replicas();
+            assertEquals(4, topicPartitionInfo.isr().size());
+            assertEquals(0, topicPartitionInfo.elr().size());
+            assertEquals(0, topicPartitionInfo.lastKnownElr().size());
+
+            killBroker(initialReplicas.get(0).id());
+            killBroker(initialReplicas.get(1).id());
+            killBroker(initialReplicas.get(2).id());
+            killBroker(initialReplicas.get(3).id());
+
+            waitForIsrAndElr((isrSize, elrSize) -> {
+                return isrSize == 0 && elrSize == 3;
+            });
+            topicPartitionInfo = 
adminClient.describeTopics(Collections.singletonList(testTopicName))
+                .allTopicNames().get().get(testTopicName).partitions().get(0);
+            int lastKnownLeader = 
topicPartitionInfo.lastKnownElr().get(0).id();
+
+            brokers().foreach(broker -> {

Review Comment:
   One of the brokers is not killed. We should avoid deleting its file, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to