C0urante commented on code in PR #16757:
URL: https://github.com/apache/kafka/pull/16757#discussion_r1700785965
##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java:
##########
@@ -153,6 +154,49 @@ public void start() {
producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
"SSL");
}
producer = new KafkaProducer<>(producerProps, new
ByteArraySerializer(), new ByteArraySerializer());
+
+ verifyClusterReadiness();
+ }
+
+ /**
+ * Perform an extended check to ensure that the primary APIs of the
cluster are available, including:
+ * <ul>
+ * <li>Ability to create a topic</li>
+ * <li>Ability to produce to a topic</li>
+ * <li>Ability to form a consumer group</li>
+ * <li>Ability to consume from a topic</li>
+ * </ul>
+ * If this method completes successfully, all resources created to verify
the cluster health
+ * (such as topics and consumer groups) will be cleaned up before it
returns.
+ * <p>
+ * This provides extra guarantees compared to other cluster readiness
checks such as
+ * {@link ConnectAssertions#assertExactlyNumBrokersAreUp(int, String)} and
+ * {@link KafkaClusterTestKit#waitForReadyBrokers()}, which verify that
brokers have
+ * completed startup and joined the cluster, but do not verify that the
internal consumer
+ * offsets topic has been created or that it's actually possible for users
to create and
+ * interact with topics.
+ */
+ public void verifyClusterReadiness() {
+ String consumerGroupId = UUID.randomUUID().toString();
+ Map<String, Object> consumerConfig =
Collections.singletonMap(GROUP_ID_CONFIG, consumerGroupId);
+ String topic = "consumer-warmup-" + consumerGroupId;
+
+ createTopic(topic);
+ produce(topic, "warmup message key", "warmup message value");
+
+ try (Consumer<?, ?> consumer =
createConsumerAndSubscribeTo(consumerConfig, topic)) {
+ ConsumerRecords<?, ?> records =
consumer.poll(Duration.ofMillis(GROUP_COORDINATOR_AVAILABILITY_DURATION_MS));
+ if (records.isEmpty()) {
+ throw new AssertionError("Failed to verify availability of
group coordinator and produce/consume APIs on Kafka cluster in time");
+ }
+ }
+
+ try (Admin admin = createAdminClient()) {
+
admin.deleteConsumerGroups(Collections.singleton(consumerGroupId)).all().get(30,
TimeUnit.SECONDS);
+ admin.deleteTopics(Collections.singleton(topic)).all().get(30,
TimeUnit.SECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException
e) {
+ throw new AssertionError("Failed to clean up cluster health check
resource(s)", e);
Review Comment:
For the first comment, we already wrap things in a try-with-resources block
in order to automatically close the consumer when the scope is exited and it
keeps things simpler to add a corresponding catch clause.
I guess we could do this for produce but I'd rather not touch on that logic
since it's not modified by this PR. If you or someone else wants to do a
comprehensive refactor for our tests to adapt this style I'd be happy to take a
look, though!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]