clarkwtc commented on code in PR #19154: URL: https://github.com/apache/kafka/pull/19154#discussion_r2008774910
########## clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientRebootstrapTest.java: ########## @@ -94,4 +101,129 @@ public void testAdminRebootstrapDisabled(ClusterInstance clusterInstance) { // Since the brokers cached during the bootstrap are offline, the admin client needs to wait the default timeout for other threads. admin.close(Duration.ZERO); } + + public void consumerRebootstrap(ClusterInstance clusterInstance, GroupProtocol groupProtocol) throws InterruptedException, ExecutionException { + clusterInstance.createTopic(TOPIC, 1, (short) REPLICAS); + + var part = 0; + var broker0 = 0; + var broker1 = 1; + var partitions = List.of(new TopicPartition(TOPIC, part)); + + try (var producer = clusterInstance.producer()) { + var recordMetadata = producer.send(new ProducerRecord<>(TOPIC, part, "key 0".getBytes(), "value 0".getBytes())).get(); + assertEquals(0, recordMetadata.offset()); + producer.flush(); + } + + clusterInstance.shutdownBroker(broker0); + + try (var consumer = clusterInstance.consumer(Map.of(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name))) { + // Only the server 1 is available for the consumer during the bootstrap. + consumer.assign(partitions); + consumer.seekToBeginning(partitions); + assertEquals(1, consumer.poll(Duration.ofSeconds(1)).count()); + + // Bring back the server 0 and shut down 1. + clusterInstance.shutdownBroker(broker1); + clusterInstance.startBroker(broker0); + + try (var producer = clusterInstance.producer()) { + var recordMetadata = producer.send(new ProducerRecord<>(TOPIC, part, "key 1".getBytes(), "value 1".getBytes())).get(); + assertEquals(1, recordMetadata.offset()); + producer.flush(); + } + + // The server 1 originally cached during the bootstrap, is offline. + // However, the server 0 from the bootstrap list is online. + assertEquals(1, consumer.poll(Duration.ofSeconds(1)).count()); Review Comment: I fixed it. ########## clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientRebootstrapTest.java: ########## @@ -94,4 +101,129 @@ public void testAdminRebootstrapDisabled(ClusterInstance clusterInstance) { // Since the brokers cached during the bootstrap are offline, the admin client needs to wait the default timeout for other threads. admin.close(Duration.ZERO); } + + public void consumerRebootstrap(ClusterInstance clusterInstance, GroupProtocol groupProtocol) throws InterruptedException, ExecutionException { + clusterInstance.createTopic(TOPIC, 1, (short) REPLICAS); + + var part = 0; + var broker0 = 0; + var broker1 = 1; + var partitions = List.of(new TopicPartition(TOPIC, part)); + + try (var producer = clusterInstance.producer()) { + var recordMetadata = producer.send(new ProducerRecord<>(TOPIC, part, "key 0".getBytes(), "value 0".getBytes())).get(); Review Comment: I fixed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org