clarkwtc commented on code in PR #19154:
URL: https://github.com/apache/kafka/pull/19154#discussion_r2006735714


##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientRebootstrapTest.java:
##########
@@ -94,4 +101,129 @@ public void testAdminRebootstrapDisabled(ClusterInstance 
clusterInstance) {
         // Since the brokers cached during the bootstrap are offline, the 
admin client needs to wait the default timeout for other threads.
         admin.close(Duration.ZERO);
     }
+
+    public void consumerRebootstrap(ClusterInstance clusterInstance, 
GroupProtocol groupProtocol) throws InterruptedException, ExecutionException {
+        clusterInstance.createTopic(TOPIC, 1, (short) REPLICAS);
+
+        var part = 0;
+        var broker0 = 0;
+        var broker1 = 1;
+        var partitions = List.of(new TopicPartition(TOPIC, part));
+
+        try (var producer = clusterInstance.producer()) {
+            var recordMetadata = producer.send(new ProducerRecord<>(TOPIC, 
part, "key 0".getBytes(), "value 0".getBytes())).get();
+            assertEquals(0, recordMetadata.offset());
+            producer.flush();
+        }
+
+        clusterInstance.shutdownBroker(broker0);
+
+        try (var consumer = 
clusterInstance.consumer(Map.of(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol.name))) {
+            // Only the server 1 is available for the consumer during the 
bootstrap.
+            consumer.assign(partitions);
+            consumer.seekToBeginning(partitions);
+            assertEquals(1, consumer.poll(Duration.ofSeconds(1)).count());
+
+            // Bring back the server 0 and shut down 1.
+            clusterInstance.shutdownBroker(broker1);
+            clusterInstance.startBroker(broker0);
+
+            try (var producer = clusterInstance.producer()) {
+                var recordMetadata = producer.send(new ProducerRecord<>(TOPIC, 
part, "key 1".getBytes(), "value 1".getBytes())).get();
+                assertEquals(1, recordMetadata.offset());
+                producer.flush();
+            }
+
+            // The server 1 originally cached during the bootstrap, is offline.
+            // However, the server 0 from the bootstrap list is online.
+            assertEquals(1, consumer.poll(Duration.ofSeconds(1)).count());
+        }
+    }
+
+    @ClusterTest(
+        brokers = REPLICAS,
+        types = {Type.KRAFT},
+        serverProperties = {
+            @ClusterConfigProperty(key = 
TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, value = "true"),
+            @ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "2"),
+        })
+    public void testClassicConsumerRebootstrap(ClusterInstance 
clusterInstance) throws InterruptedException, ExecutionException {
+        consumerRebootstrap(clusterInstance, GroupProtocol.CLASSIC);
+    }
+
+    @ClusterTest(
+        brokers = REPLICAS,
+        types = {Type.KRAFT},
+        serverProperties = {
+            @ClusterConfigProperty(key = 
TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, value = "true"),
+            @ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "2"),
+        })
+    public void testConsumerRebootstrap(ClusterInstance clusterInstance) 
throws InterruptedException, ExecutionException {
+        consumerRebootstrap(clusterInstance, GroupProtocol.CONSUMER);
+    }
+
+    public void consumerRebootstrapDisabled(ClusterInstance clusterInstance, 
GroupProtocol groupProtocol) throws InterruptedException, ExecutionException {
+        clusterInstance.createTopic(TOPIC, 1, (short) REPLICAS);
+
+        var part = 0;
+        var broker0 = 0;
+        var broker1 = 1;
+        var tp = new TopicPartition(TOPIC, part);
+
+        try (var producer = clusterInstance.producer()) {
+            var recordMetadata = producer.send(new ProducerRecord<>(TOPIC, 
part, "key 0".getBytes(), "value 0".getBytes())).get();
+            assertEquals(0, recordMetadata.offset());
+            producer.flush();
+        }
+
+        clusterInstance.shutdownBroker(broker0);
+
+        try (var consumer = clusterInstance.consumer(Map.of(
+            CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG, "none",
+            ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name)
+        )) {
+            // Only the server 1 is available for the consumer during the 
bootstrap.
+            consumer.assign(List.of(tp));
+            consumer.seekToBeginning(List.of(tp));
+            assertEquals(1, consumer.poll(Duration.ofSeconds(1)).count());
+
+            // Bring back the server 0 and shut down 1.
+            clusterInstance.shutdownBroker(broker1);
+            clusterInstance.startBroker(broker0);
+
+            try (var producer = clusterInstance.producer()) {
+                var recordMetadata = producer.send(new ProducerRecord<>(TOPIC, 
part, "key 1".getBytes(), "value 1".getBytes())).get();
+                assertEquals(1, recordMetadata.offset());
+                producer.flush();

Review Comment:
   Maybe we should remove the call `get`?
   I need to call `flush`; without it, if the broker restarts on rebootstrap, 
the consumer polls nothing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to